From d97e3a6b7b38b62703b111c08daaafc06ebbb54b Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Wed, 6 Sep 2023 16:38:30 +0100 Subject: [PATCH 1/7] Add AddAddresses method to DHT --- v2/coord/conversion.go | 5 + v2/coord/coordinator.go | 48 ++-- v2/coord/coordinator_test.go | 12 +- v2/coord/event.go | 5 +- v2/coord/internal/nettest/layouts.go | 4 +- v2/coord/internal/nettest/topology.go | 4 +- v2/coord/network_test.go | 2 +- v2/coord/routing.go | 7 +- v2/coord/routing_test.go | 2 +- v2/dht.go | 18 +- v2/dht_test.go | 53 ++++ v2/handlers_test.go | 28 +- v2/{coord => }/internal/kadtest/context.go | 0 v2/query_test.go | 288 +++++++++++++++++++++ v2/stream_test.go | 2 +- 15 files changed, 429 insertions(+), 49 deletions(-) rename v2/{coord => }/internal/kadtest/context.go (100%) create mode 100644 v2/query_test.go diff --git a/v2/coord/conversion.go b/v2/coord/conversion.go index dadc0bcc6..3a6b0ba85 100644 --- a/v2/coord/conversion.go +++ b/v2/coord/conversion.go @@ -27,6 +27,11 @@ func NodeIDToAddrInfo(id kad.NodeID[KadKey]) peer.AddrInfo { } } +// AddrInfoToNodeID converts a peer.AddrInfo to a kad.NodeID. +func AddrInfoToNodeID(ai peer.AddrInfo) kad.NodeID[KadKey] { + return kadt.PeerID(ai.ID) +} + // SliceOfNodeInfoToSliceOfAddrInfo converts a kad.NodeInfo to a peer.AddrInfo. // This function will panic if any info.ID() does not return a kadt.PeerID func SliceOfNodeInfoToSliceOfAddrInfo(infos []kad.NodeInfo[KadKey, ma.Multiaddr]) []peer.AddrInfo { diff --git a/v2/coord/coordinator.go b/v2/coord/coordinator.go index 9d33066fc..e5c55867c 100644 --- a/v2/coord/coordinator.go +++ b/v2/coord/coordinator.go @@ -27,6 +27,9 @@ type Coordinator struct { // self is the peer id of the system the dht is running on self peer.ID + // cancel is used to cancel all running goroutines when the coordinator is cleaning up + cancel context.CancelFunc + // cfg is a copy of the optional configuration supplied to the dht cfg CoordinatorConfig @@ -48,8 +51,6 @@ type Coordinator struct { queryBehaviour Behaviour[BehaviourEvent, BehaviourEvent] } -const DefaultChanqueueCapacity = 1024 - type CoordinatorConfig struct { PeerstoreTTL time.Duration // duration for which a peer is kept in the peerstore @@ -111,7 +112,7 @@ func (cfg *CoordinatorConfig) Validate() error { func DefaultConfig() *CoordinatorConfig { return &CoordinatorConfig{ - Clock: clock.New(), // use standard time + Clock: clock.New(), PeerstoreTTL: 10 * time.Minute, QueryConcurrency: 3, QueryTimeout: 5 * time.Minute, @@ -181,23 +182,32 @@ func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey, networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger) + ctx, cancel := context.WithCancel(context.Background()) + d := &Coordinator{ - self: self, - cfg: *cfg, - rtr: rtr, - rt: rt, + self: self, + cfg: *cfg, + rtr: rtr, + rt: rt, + cancel: cancel, networkBehaviour: networkBehaviour, routingBehaviour: routingBehaviour, queryBehaviour: queryBehaviour, - routingNotifications: make(chan RoutingNotification, 20), + routingNotifications: make(chan RoutingNotification, 20), // buffered mainly to allow tests to read the channel after running an operation } - go d.eventLoop() + go d.eventLoop(ctx) return d, nil } +// Close cleans up all resources associated with this Coordinator. +func (d *Coordinator) Close() error { + d.cancel() + return nil +} + func (d *Coordinator) ID() peer.ID { return d.self } @@ -216,13 +226,16 @@ func (d *Coordinator) RoutingNotifications() <-chan RoutingNotification { return d.routingNotifications } -func (d *Coordinator) eventLoop() { - ctx := context.Background() - +func (d *Coordinator) eventLoop(ctx context.Context) { + ctx, span := util.StartSpan(ctx, "Coordinator.eventLoop") + defer span.End() for { var ev BehaviourEvent var ok bool select { + case <-ctx.Done(): + // coordinator is closing + return case <-d.networkBehaviour.Ready(): ev, ok = d.networkBehaviour.Perform(ctx) case <-d.routingBehaviour.Ready(): @@ -384,17 +397,20 @@ func (d *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (Q // AddNodes suggests new DHT nodes and their associated addresses to be added to the routing table. // If the routing table is updated as a result of this operation an EventRoutingUpdated notification // is emitted on the routing notification channel. -func (d *Coordinator) AddNodes(ctx context.Context, infos []peer.AddrInfo) error { +func (d *Coordinator) AddNodes(ctx context.Context, ais []peer.AddrInfo, ttl time.Duration) error { ctx, span := util.StartSpan(ctx, "Coordinator.AddNodes") defer span.End() - for _, info := range infos { - if info.ID == d.self { + for _, ai := range ais { + if ai.ID == d.self { // skip self continue } + // TODO: apply address filter + d.routingBehaviour.Notify(ctx, &EventAddAddrInfo{ - NodeInfo: info, + NodeInfo: ai, + TTL: ttl, }) } diff --git a/v2/coord/coordinator_test.go b/v2/coord/coordinator_test.go index 213c35882..d6b1b8939 100644 --- a/v2/coord/coordinator_test.go +++ b/v2/coord/coordinator_test.go @@ -12,8 +12,8 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" - "github.com/libp2p/go-libp2p-kad-dht/v2/coord/internal/kadtest" "github.com/libp2p/go-libp2p-kad-dht/v2/coord/internal/nettest" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" ) @@ -173,7 +173,7 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { require.NoError(t, err) tev := ev.(*EventRoutingUpdated) - require.Equal(t, nodes[2].NodeInfo.ID, NodeIDToPeerID(tev.NodeInfo.ID())) + require.Equal(t, nodes[2].NodeInfo.ID, tev.NodeInfo.ID) // no EventRoutingUpdated is sent for the self node @@ -184,7 +184,7 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { require.NoError(t, err) tev = ev.(*EventRoutingUpdated) - require.Equal(t, nodes[3].NodeInfo.ID, NodeIDToPeerID(tev.NodeInfo.ID())) + require.Equal(t, nodes[3].NodeInfo.ID, tev.NodeInfo.ID) } func TestBootstrap(t *testing.T) { @@ -273,7 +273,7 @@ func TestIncludeNode(t *testing.T) { events := d.RoutingNotifications() // inject a new node into the dht's includeEvents queue - err = d.AddNodes(ctx, []peer.AddrInfo{candidate}) + err = d.AddNodes(ctx, []peer.AddrInfo{candidate}, time.Minute) require.NoError(t, err) // the include state machine runs in the background and eventually should add the node to routing table @@ -281,9 +281,9 @@ func TestIncludeNode(t *testing.T) { require.NoError(t, err) tev := ev.(*EventRoutingUpdated) - require.Equal(t, candidate.ID, NodeIDToPeerID(tev.NodeInfo.ID())) + require.Equal(t, candidate.ID, tev.NodeInfo.ID) - // the routing table should not contain the node yet + // the routing table should now contain the node _, err = d.GetNode(ctx, candidate.ID) require.NoError(t, err) } diff --git a/v2/coord/event.go b/v2/coord/event.go index f4ecae4d5..4d5790f77 100644 --- a/v2/coord/event.go +++ b/v2/coord/event.go @@ -1,6 +1,8 @@ package coord import ( + "time" + "github.com/libp2p/go-libp2p/core/peer" ma "github.com/multiformats/go-multiaddr" "github.com/plprobelab/go-kademlia/kad" @@ -86,6 +88,7 @@ func (*EventStopQuery) queryCommand() {} type EventAddAddrInfo struct { NodeInfo peer.AddrInfo + TTL time.Duration } func (*EventAddAddrInfo) behaviourEvent() {} @@ -133,7 +136,7 @@ func (*EventQueryFinished) behaviourEvent() {} // EventRoutingUpdated is emitted by the coordinator when a new node has been verified and added to the routing table. type EventRoutingUpdated struct { - NodeInfo kad.NodeInfo[KadKey, ma.Multiaddr] + NodeInfo peer.AddrInfo } func (*EventRoutingUpdated) behaviourEvent() {} diff --git a/v2/coord/internal/nettest/layouts.go b/v2/coord/internal/nettest/layouts.go index 7aa548bff..a49a94bb3 100644 --- a/v2/coord/internal/nettest/layouts.go +++ b/v2/coord/internal/nettest/layouts.go @@ -19,10 +19,10 @@ import ( // The topology is not a ring: nodes[0] only has nodes[1] in its table and nodes[n-1] only has nodes[n-2] in its table. // nodes[1] has nodes[0] and nodes[2] in its routing table. // If n > 2 then the first and last nodes will not have one another in their routing tables. -func LinearTopology(n int, clk *clock.Mock) (*Topology, []*Node, error) { +func LinearTopology(n int, clk clock.Clock) (*Topology, []*Node, error) { nodes := make([]*Node, n) - top := INewTopology(clk) + top := NewTopology(clk) for i := range nodes { a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2000+i)) diff --git a/v2/coord/internal/nettest/topology.go b/v2/coord/internal/nettest/topology.go index c275e1ad2..c7aae8d5d 100644 --- a/v2/coord/internal/nettest/topology.go +++ b/v2/coord/internal/nettest/topology.go @@ -21,14 +21,14 @@ type Node struct { } type Topology struct { - clk *clock.Mock + clk clock.Clock links map[string]Link nodes []*Node nodeIndex map[peer.ID]*Node routers map[peer.ID]*Router } -func INewTopology(clk *clock.Mock) *Topology { +func NewTopology(clk clock.Clock) *Topology { return &Topology{ clk: clk, links: make(map[string]Link), diff --git a/v2/coord/network_test.go b/v2/coord/network_test.go index a596edfb6..cbba0f590 100644 --- a/v2/coord/network_test.go +++ b/v2/coord/network_test.go @@ -7,8 +7,8 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/exp/slog" - "github.com/libp2p/go-libp2p-kad-dht/v2/coord/internal/kadtest" "github.com/libp2p/go-libp2p-kad-dht/v2/coord/internal/nettest" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" ) diff --git a/v2/coord/routing.go b/v2/coord/routing.go index 11e70c41f..a4f918dbe 100644 --- a/v2/coord/routing.go +++ b/v2/coord/routing.go @@ -80,6 +80,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { if ev.NodeInfo.ID == r.self { break } + // TODO: apply ttl cmd := &routing.EventIncludeAddCandidate[KadKey, ma.Multiaddr]{ NodeInfo: kadt.AddrInfo{Info: ev.NodeInfo}, } @@ -92,7 +93,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { case *EventRoutingUpdated: span.SetAttributes(attribute.String("event", "EventRoutingUpdated")) cmd := &routing.EventProbeAdd[KadKey]{ - NodeID: ev.NodeInfo.ID(), + NodeID: AddrInfoToNodeID(ev.NodeInfo), } // attempt to advance the probe state machine next, ok := r.advanceProbe(ctx, cmd) @@ -294,12 +295,12 @@ func (r *RoutingBehaviour) advanceInclude(ctx context.Context, ev routing.Includ // notify other routing state machines that there is a new node in the routing table r.notify(ctx, &EventRoutingUpdated{ - NodeInfo: st.NodeInfo, + NodeInfo: NodeInfoToAddrInfo(st.NodeInfo), }) // return the event to notify outwards too return &EventRoutingUpdated{ - NodeInfo: st.NodeInfo, + NodeInfo: NodeInfoToAddrInfo(st.NodeInfo), }, true case *routing.StateIncludeWaitingAtCapacity: // nothing to do except wait for message response or timeout diff --git a/v2/coord/routing_test.go b/v2/coord/routing_test.go index 6b6e0498b..2b316fb67 100644 --- a/v2/coord/routing_test.go +++ b/v2/coord/routing_test.go @@ -14,8 +14,8 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/exp/slog" - "github.com/libp2p/go-libp2p-kad-dht/v2/coord/internal/kadtest" "github.com/libp2p/go-libp2p-kad-dht/v2/coord/internal/nettest" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" "github.com/libp2p/go-libp2p-kad-dht/v2/pb" ) diff --git a/v2/dht.go b/v2/dht.go index 675c83bc9..69a58056a 100644 --- a/v2/dht.go +++ b/v2/dht.go @@ -1,15 +1,18 @@ package dht import ( + "context" "crypto/sha256" "fmt" "io" "sync" + "time" "github.com/ipfs/go-datastore/trace" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/plprobelab/go-kademlia/kad" "github.com/plprobelab/go-kademlia/key" "github.com/plprobelab/go-kademlia/routing" @@ -59,8 +62,9 @@ type DHT struct { func New(h host.Host, cfg *Config) (*DHT, error) { var err error - // check if the configuration is valid - if err = cfg.Validate(); err != nil { + if cfg == nil { + cfg = DefaultConfig() + } else if err = cfg.Validate(); err != nil { return nil, fmt.Errorf("validate DHT config: %w", err) } @@ -155,6 +159,10 @@ func (d *DHT) Close() error { d.log.With("err", err).Debug("failed closing event bus subscription") } + if err := d.kad.Close(); err != nil { + d.log.With("err", err).Debug("failed closing coordinator") + } + for ns, b := range d.backends { closer, ok := b.(io.Closer) if !ok { @@ -265,6 +273,12 @@ func (d *DHT) logErr(err error, msg string) { d.log.Warn(msg, "err", err.Error()) } +// AddAddresses suggests peers and their associated addresses to be added to the routing table. +// Addresses will be added to the peerstore with the supplied time to live. +func (d *DHT) AddAddresses(ctx context.Context, ais []peer.AddrInfo, ttl time.Duration) error { + return d.kad.AddNodes(ctx, ais, ttl) +} + // newSHA256Key returns a [key.Key256] that conforms to the [kad.Key] interface by // SHA256 hashing the given bytes and wrapping them in a [key.Key256]. func newSHA256Key(data []byte) key.Key256 { diff --git a/v2/dht_test.go b/v2/dht_test.go index 832c8ed14..c47eda860 100644 --- a/v2/dht_test.go +++ b/v2/dht_test.go @@ -1,10 +1,19 @@ package dht import ( + "context" + "fmt" + "reflect" "testing" + "time" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/libp2p/go-libp2p-kad-dht/v2/coord" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" ) func TestNew(t *testing.T) { @@ -65,3 +74,47 @@ func TestNew(t *testing.T) { }) } } + +// expectEventType selects on the event channel until an event of the expected type is sent. +func expectEventType(t *testing.T, ctx context.Context, events <-chan coord.RoutingNotification, expected coord.RoutingNotification) (coord.RoutingNotification, error) { + t.Helper() + for { + select { + case ev := <-events: + t.Logf("saw event: %T\n", ev) + if reflect.TypeOf(ev) == reflect.TypeOf(expected) { + return ev, nil + } + case <-ctx.Done(): + return nil, fmt.Errorf("test deadline exceeded while waiting for event %T", expected) + } + } +} + +func TestAddAddresses(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + local := newClientDht(t, nil) + + remote := newServerDht(t, nil) + + // Populate and entries in remote's routing table so it responds to a connectivity check + fillRoutingTable(t, remote, 1) + + // TODO: add method to DHT to return its address? + pstore := remote.host.Peerstore() + remoteAddrInfo := pstore.PeerInfo(remote.host.ID()) + + // Add remote's addresss to the local dht + err := local.AddAddresses(ctx, []peer.AddrInfo{remoteAddrInfo}, time.Minute) + require.NoError(t, err) + + // the include state machine runs in the background and eventually should add the node to routing table + _, err = expectEventType(t, ctx, local.kad.RoutingNotifications(), &coord.EventRoutingUpdated{}) + require.NoError(t, err) + + // the routing table should now contain the node + _, err = local.kad.GetNode(ctx, remote.host.ID()) + require.NoError(t, err) +} diff --git a/v2/handlers_test.go b/v2/handlers_test.go index 5c9b0c891..ff593e921 100644 --- a/v2/handlers_test.go +++ b/v2/handlers_test.go @@ -75,11 +75,11 @@ func newIdentity(t testing.TB) (peer.ID, crypto.PrivKey) { return id, priv } -func fillRoutingTable(t testing.TB, d *DHT) { +// fillRoutingTable populates d's routing table and peerstore with n random peers and addresses +func fillRoutingTable(t testing.TB, d *DHT, n int) { t.Helper() - // 250 is a common number of peers to have in the routing table - for i := 0; i < 250; i++ { + for i := 0; i < n; i++ { // generate peer ID pid := newPeerID(t) @@ -862,7 +862,7 @@ func TestDHT_handlePutValue_moved_from_v1_atomic_operation(t *testing.T) { func BenchmarkDHT_handleGetValue(b *testing.B) { d := newTestDHT(b) - fillRoutingTable(b, d) + fillRoutingTable(b, d, 250) rbe, ok := d.backends[namespaceIPNS].(*RecordBackend) require.True(b, ok) @@ -906,7 +906,7 @@ func BenchmarkDHT_handleGetValue(b *testing.B) { func TestDHT_handleGetValue_happy_path_ipns_record(t *testing.T) { d := newTestDHT(t) - fillRoutingTable(t, d) + fillRoutingTable(t, d, 250) remote, priv := newIdentity(t) @@ -941,7 +941,7 @@ func TestDHT_handleGetValue_happy_path_ipns_record(t *testing.T) { func TestDHT_handleGetValue_record_not_found(t *testing.T) { d := newTestDHT(t) - fillRoutingTable(t, d) + fillRoutingTable(t, d, 250) for _, ns := range []string{namespaceIPNS, namespacePublicKey} { t.Run(ns, func(t *testing.T) { @@ -965,7 +965,7 @@ func TestDHT_handleGetValue_record_not_found(t *testing.T) { func TestDHT_handleGetValue_corrupt_record_in_datastore(t *testing.T) { d := newTestDHT(t) - fillRoutingTable(t, d) + fillRoutingTable(t, d, 250) for _, ns := range []string{namespaceIPNS, namespacePublicKey} { t.Run(ns, func(t *testing.T) { @@ -1003,7 +1003,7 @@ func TestDHT_handleGetValue_corrupt_record_in_datastore(t *testing.T) { func TestDHT_handleGetValue_ipns_max_age_exceeded_in_datastore(t *testing.T) { d := newTestDHT(t) - fillRoutingTable(t, d) + fillRoutingTable(t, d, 250) remote, priv := newIdentity(t) @@ -1045,7 +1045,7 @@ func TestDHT_handleGetValue_ipns_max_age_exceeded_in_datastore(t *testing.T) { func TestDHT_handleGetValue_does_not_validate_stored_record(t *testing.T) { d := newTestDHT(t) - fillRoutingTable(t, d) + fillRoutingTable(t, d, 250) rbe, err := typedBackend[*RecordBackend](d, namespaceIPNS) require.NoError(t, err) @@ -1128,7 +1128,7 @@ func TestDHT_handleGetValue_supports_providers(t *testing.T) { p := newAddrInfo(t) key := []byte("random-key") - fillRoutingTable(t, d) + fillRoutingTable(t, d, 250) // add to addresses peerstore d.host.Peerstore().AddAddrs(p.ID, p.Addrs, time.Hour) @@ -1355,7 +1355,7 @@ func BenchmarkDHT_handleGetProviders(b *testing.B) { ctx := context.Background() d := newTestDHT(b) - fillRoutingTable(b, d) + fillRoutingTable(b, d, 250) be, ok := d.backends[namespaceIPNS].(*RecordBackend) require.True(b, ok) @@ -1401,7 +1401,7 @@ func TestDHT_handleGetProviders_happy_path(t *testing.T) { ctx := context.Background() d := newTestDHT(t) - fillRoutingTable(t, d) + fillRoutingTable(t, d, 250) key := []byte("random-key") @@ -1452,7 +1452,7 @@ func TestDHT_handleGetProviders_do_not_return_expired_records(t *testing.T) { ctx := context.Background() d := newTestDHT(t) - fillRoutingTable(t, d) + fillRoutingTable(t, d, 250) key := []byte("random-key") @@ -1511,7 +1511,7 @@ func TestDHT_handleGetProviders_only_serve_filtered_addresses(t *testing.T) { d := newTestDHTWithConfig(t, cfg) - fillRoutingTable(t, d) + fillRoutingTable(t, d, 250) key := []byte("random-key") diff --git a/v2/coord/internal/kadtest/context.go b/v2/internal/kadtest/context.go similarity index 100% rename from v2/coord/internal/kadtest/context.go rename to v2/internal/kadtest/context.go diff --git a/v2/query_test.go b/v2/query_test.go new file mode 100644 index 000000000..996d6e833 --- /dev/null +++ b/v2/query_test.go @@ -0,0 +1,288 @@ +package dht + +import ( + "testing" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/stretchr/testify/require" +) + +// import ( +// "context" +// "fmt" +// "testing" +// "time" + +// "github.com/libp2p/go-libp2p" +// tu "github.com/libp2p/go-libp2p-testing/etc" +// "github.com/libp2p/go-libp2p/core/host" +// "github.com/libp2p/go-libp2p/core/peer" + +// "github.com/stretchr/testify/require" +// ) + +// TODO Debug test failures due to timing issue on windows +// Tests are timing dependent as can be seen in the 2 seconds timed context that we use in "tu.WaitFor". +// While the tests work fine on OSX and complete in under a second, +// they repeatedly fail to complete in the stipulated time on Windows. +// However, increasing the timeout makes them pass on Windows. + +// func TestRTEvictionOnFailedQuery(t *testing.T) { +// ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) +// defer cancel() + +// d1 := setupDHT(ctx, t, false) +// d2 := setupDHT(ctx, t, false) + +// for i := 0; i < 10; i++ { +// connect(t, ctx, d1, d2) +// for _, conn := range d1.host.Network().ConnsToPeer(d2.self) { +// conn.Close() +// } +// } + +// // peers should be in the RT because of fixLowPeers +// require.NoError(t, tu.WaitFor(ctx, func() error { +// if !checkRoutingTable(d1, d2) { +// return fmt.Errorf("should have routes") +// } +// return nil +// })) + +// // close both hosts so query fails +// require.NoError(t, d1.host.Close()) +// require.NoError(t, d2.host.Close()) +// // peers will still be in the RT because we have decoupled membership from connectivity +// require.NoError(t, tu.WaitFor(ctx, func() error { +// if !checkRoutingTable(d1, d2) { +// return fmt.Errorf("should have routes") +// } +// return nil +// })) + +// // failed queries should remove the peers from the RT +// _, err := d1.GetClosestPeers(ctx, "test") +// require.NoError(t, err) + +// _, err = d2.GetClosestPeers(ctx, "test") +// require.NoError(t, err) + +// require.NoError(t, tu.WaitFor(ctx, func() error { +// if checkRoutingTable(d1, d2) { +// return fmt.Errorf("should not have routes") +// } +// return nil +// })) +// } + +// func newPeerPair(t testing.TB) (host.Host, *DHT) { +// listenAddr := libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0") + +// client, err := libp2p.New(listenAddr) +// require.NoError(t, err) + +// server, err := libp2p.New(listenAddr) +// require.NoError(t, err) + +// cfg := DefaultConfig() +// cfg.Mode = ModeOptServer +// serverDHT, err := New(server, cfg) + +// fillRoutingTable(t, serverDHT) + +// t.Cleanup(func() { +// if err = serverDHT.Close(); err != nil { +// t.Logf("failed closing DHT: %s", err) +// } + +// if err = client.Close(); err != nil { +// t.Logf("failed closing client host: %s", err) +// } + +// if err = server.Close(); err != nil { +// t.Logf("failed closing client host: %s", err) +// } +// }) + +// ctx := context.Background() +// err = client.Connect(ctx, peer.AddrInfo{ +// ID: server.ID(), +// Addrs: server.Addrs(), +// }) +// require.NoError(t, err) + +// return client, serverDHT +// } + +func newServerHost(t testing.TB) host.Host { + listenAddr := libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0") + + h, err := libp2p.New(listenAddr) + require.NoError(t, err) + + t.Cleanup(func() { + if err = h.Close(); err != nil { + t.Logf("unexpected error when closing host: %s", err) + } + }) + + return h +} + +func newClientHost(t testing.TB) host.Host { + h, err := libp2p.New(libp2p.NoListenAddrs) + require.NoError(t, err) + + t.Cleanup(func() { + if err = h.Close(); err != nil { + t.Logf("unexpected error when closing host: %s", err) + } + }) + + return h +} + +func newServerDht(t testing.TB, cfg *Config) *DHT { + h := newServerHost(t) + + d, err := New(h, cfg) + require.NoError(t, err) + + t.Cleanup(func() { + if err = d.Close(); err != nil { + t.Logf("unexpected error when closing dht: %s", err) + } + }) + return d +} + +func newClientDht(t testing.TB, cfg *Config) *DHT { + h := newClientHost(t) + + d, err := New(h, cfg) + require.NoError(t, err) + + t.Cleanup(func() { + if err = d.Close(); err != nil { + t.Logf("unexpected error when closing dht: %s", err) + } + }) + return d +} + +// func newTestDHTForHostWithConfig(t testing.TB, h host.Host, cfg *Config) *DHT { +// t.Helper() + +// h, err := libp2p.New(libp2p.NoListenAddrs) +// require.NoError(t, err) + +// d, err := New(h, cfg) +// require.NoError(t, err) + +// t.Cleanup(func() { +// if err = d.Close(); err != nil { +// t.Logf("closing dht: %s", err) +// } + +// if err = h.Close(); err != nil { +// t.Logf("closing host: %s", err) +// } +// }) + +// return d +// } + +// func connect(t *testing.T, ctx context.Context, a, b *DHT) { +// t.Helper() +// connectNoSync(t, ctx, a, b) +// wait(t, ctx, a, b) +// wait(t, ctx, b, a) +// } + +// func wait(t *testing.T, ctx context.Context, a, b *DHT) { +// t.Helper() + +// // loop until connection notification has been received. +// // under high load, this may not happen as immediately as we would like. +// for a.routingTable.Find(b.self) == "" { +// select { +// case <-ctx.Done(): +// t.Fatal(ctx.Err()) +// case <-time.After(time.Millisecond * 5): +// } +// } +// } + +// func connectNoSync(t *testing.T, ctx context.Context, a, b *DHT) { +// t.Helper() + +// idB := b.self +// addrB := b.peerstore.Addrs(idB) +// if len(addrB) == 0 { +// t.Fatal("peers setup incorrectly: no local address") +// } + +// if err := a.host.Connect(ctx, peer.AddrInfo{ID: idB, Addrs: addrB}); err != nil { +// t.Fatal(err) +// } +// } + +// func TestRTAdditionOnSuccessfulQuery(t *testing.T) { +// ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) +// defer cancel() + +// // d1 := setupDHT(ctx, t, false) +// // d2 := setupDHT(ctx, t, false) +// // d3 := setupDHT(ctx, t, false) + +// d1 := newTestDHT(t) +// defer d1.Close() +// d2 := newTestDHT(t) +// defer d2.Close() +// d3 := newTestDHT(t) +// defer d3.Close() + +// connect(t, ctx, d1, d2) +// connect(t, ctx, d2, d3) +// // validate RT states + +// // d1 has d2 +// require.NoError(t, tu.WaitFor(ctx, func() error { +// if !checkRoutingTable(d1, d2) { +// return fmt.Errorf("should have routes") +// } +// return nil +// })) +// // d2 has d3 +// require.NoError(t, tu.WaitFor(ctx, func() error { +// if !checkRoutingTable(d2, d3) { +// return fmt.Errorf("should have routes") +// } +// return nil +// })) + +// // however, d1 does not know about d3 +// require.NoError(t, tu.WaitFor(ctx, func() error { +// if checkRoutingTable(d1, d3) { +// return fmt.Errorf("should not have routes") +// } +// return nil +// })) + +// // but when d3 queries d2, d1 and d3 discover each other +// _, err := d3.GetClosestPeers(ctx, "something") +// require.NoError(t, err) +// require.NoError(t, tu.WaitFor(ctx, func() error { +// if !checkRoutingTable(d1, d3) { +// return fmt.Errorf("should have routes") +// } +// return nil +// })) +// } + +// func checkRoutingTable(a, b *IpfsDHT) bool { +// // loop until connection notification has been received. +// // under high load, this may not happen as immediately as we would like. +// return a.routingTable.Find(b.self) != "" && b.routingTable.Find(a.self) != "" +// } diff --git a/v2/stream_test.go b/v2/stream_test.go index 83e203d3e..12f610316 100644 --- a/v2/stream_test.go +++ b/v2/stream_test.go @@ -63,7 +63,7 @@ func newPeerPair(t testing.TB) (host.Host, *DHT) { cfg.Mode = ModeOptServer serverDHT, err := New(server, cfg) - fillRoutingTable(t, serverDHT) + fillRoutingTable(t, serverDHT, 250) t.Cleanup(func() { if err = serverDHT.Close(); err != nil { From 4c47b81e1c85c25b1f9beca51a394371436a3aa9 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 7 Sep 2023 10:04:04 +0100 Subject: [PATCH 2/7] Add AddAddresses method to DHT --- v2/coord/coordinator.go | 6 +- v2/coord/coordinator_test.go | 24 ++-- v2/coord/routing.go | 3 + v2/dht.go | 11 +- v2/dht_test.go | 22 +++- v2/internal/kadtest/tracing.go | 28 ++++ v2/query_test.go | 232 ++------------------------------- 7 files changed, 81 insertions(+), 245 deletions(-) create mode 100644 v2/internal/kadtest/tracing.go diff --git a/v2/coord/coordinator.go b/v2/coord/coordinator.go index 9476d8d6f..8d358d75a 100644 --- a/v2/coord/coordinator.go +++ b/v2/coord/coordinator.go @@ -118,10 +118,10 @@ func (cfg *CoordinatorConfig) Validate() error { } } - return nil + return nil; } -func DefaultConfig() (*CoordinatorConfig, error) { +func DefaultCoordinatorConfig() (*CoordinatorConfig, error) { telemetry, err := tele.NewWithGlobalProviders() if err != nil { return nil, fmt.Errorf("new telemetry: %w", err) @@ -141,7 +141,7 @@ func DefaultConfig() (*CoordinatorConfig, error) { func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey, kad.NodeID[KadKey]], cfg *CoordinatorConfig) (*Coordinator, error) { if cfg == nil { - c, err := DefaultConfig() + c, err := DefaultCoordinatorConfig() if err != nil { return nil, fmt.Errorf("default config: %w", err) } diff --git a/v2/coord/coordinator_test.go b/v2/coord/coordinator_test.go index f7b67e670..7908674d4 100644 --- a/v2/coord/coordinator_test.go +++ b/v2/coord/coordinator_test.go @@ -37,14 +37,14 @@ func expectEventType(t *testing.T, ctx context.Context, events <-chan RoutingNot func TestConfigValidate(t *testing.T) { t.Run("default is valid", func(t *testing.T) { - cfg, err := DefaultConfig() + cfg, err := DefaultCoordinatorConfig() require.NoError(t, err) require.NoError(t, cfg.Validate()) }) t.Run("clock is not nil", func(t *testing.T) { - cfg, err := DefaultConfig() + cfg, err := DefaultCoordinatorConfig() require.NoError(t, err) cfg.Clock = nil @@ -52,7 +52,7 @@ func TestConfigValidate(t *testing.T) { }) t.Run("query concurrency positive", func(t *testing.T) { - cfg, err := DefaultConfig() + cfg, err := DefaultCoordinatorConfig() require.NoError(t, err) cfg.QueryConcurrency = 0 @@ -62,7 +62,7 @@ func TestConfigValidate(t *testing.T) { }) t.Run("query timeout positive", func(t *testing.T) { - cfg, err := DefaultConfig() + cfg, err := DefaultCoordinatorConfig() require.NoError(t, err) cfg.QueryTimeout = 0 @@ -72,7 +72,7 @@ func TestConfigValidate(t *testing.T) { }) t.Run("request concurrency positive", func(t *testing.T) { - cfg, err := DefaultConfig() + cfg, err := DefaultCoordinatorConfig() require.NoError(t, err) cfg.RequestConcurrency = 0 @@ -82,7 +82,7 @@ func TestConfigValidate(t *testing.T) { }) t.Run("request timeout positive", func(t *testing.T) { - cfg, err := DefaultConfig() + cfg, err := DefaultCoordinatorConfig() require.NoError(t, err) cfg.RequestTimeout = 0 @@ -92,7 +92,7 @@ func TestConfigValidate(t *testing.T) { }) t.Run("logger not nil", func(t *testing.T) { - cfg, err := DefaultConfig() + cfg, err := DefaultCoordinatorConfig() require.NoError(t, err) cfg.Logger = nil @@ -100,7 +100,7 @@ func TestConfigValidate(t *testing.T) { }) t.Run("telemetry not nil", func(t *testing.T) { - cfg, err := DefaultConfig() + cfg, err := DefaultCoordinatorConfig() require.NoError(t, err) cfg.Tele = nil @@ -115,7 +115,7 @@ func TestExhaustiveQuery(t *testing.T) { clk := clock.NewMock() _, nodes, err := nettest.LinearTopology(4, clk) require.NoError(t, err) - ccfg, err := DefaultConfig() + ccfg, err := DefaultCoordinatorConfig() require.NoError(t, err) ccfg.Clock = clk @@ -156,7 +156,7 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { _, nodes, err := nettest.LinearTopology(4, clk) require.NoError(t, err) - ccfg, err := DefaultConfig() + ccfg, err := DefaultCoordinatorConfig() require.NoError(t, err) ccfg.Clock = clk @@ -221,7 +221,7 @@ func TestBootstrap(t *testing.T) { _, nodes, err := nettest.LinearTopology(4, clk) require.NoError(t, err) - ccfg, err := DefaultConfig() + ccfg, err := DefaultCoordinatorConfig() require.NoError(t, err) ccfg.Clock = clk @@ -278,7 +278,7 @@ func TestIncludeNode(t *testing.T) { _, nodes, err := nettest.LinearTopology(4, clk) require.NoError(t, err) - ccfg, err := DefaultConfig() + ccfg, err := DefaultCoordinatorConfig() require.NoError(t, err) ccfg.Clock = clk diff --git a/v2/coord/routing.go b/v2/coord/routing.go index 103d9f207..4808c18ec 100644 --- a/v2/coord/routing.go +++ b/v2/coord/routing.go @@ -281,9 +281,11 @@ func (r *RoutingBehaviour) advanceBootstrap(ctx context.Context, ev routing.Boot func (r *RoutingBehaviour) advanceInclude(ctx context.Context, ev routing.IncludeEvent) (BehaviourEvent, bool) { ctx, span := r.tracer.Start(ctx, "RoutingBehaviour.advanceInclude") defer span.End() + istate := r.include.Advance(ctx, ev) switch st := istate.(type) { case *routing.StateIncludeFindNodeMessage[KadKey, ma.Multiaddr]: + span.SetAttributes(attribute.String("out_event", "EventOutboundGetCloserNodes")) // include wants to send a find node message to a node return &EventOutboundGetCloserNodes{ QueryID: "include", @@ -301,6 +303,7 @@ func (r *RoutingBehaviour) advanceInclude(ctx context.Context, ev routing.Includ }) // return the event to notify outwards too + span.SetAttributes(attribute.String("out_event", "EventRoutingUpdated")) return &EventRoutingUpdated{ NodeInfo: NodeInfoToAddrInfo(st.NodeInfo), }, true diff --git a/v2/dht.go b/v2/dht.go index cdc827ce1..31787ae54 100644 --- a/v2/dht.go +++ b/v2/dht.go @@ -152,7 +152,13 @@ func New(h host.Host, cfg *Config) (*DHT, error) { } // instantiate a new Kademlia DHT coordinator. - d.kad, err = coord.NewCoordinator(d.host.ID(), &Router{host: h}, d.rt, nil) + coordCfg, err := coord.DefaultCoordinatorConfig() + if err != nil { + return nil, fmt.Errorf("new coordinator config: %w", err) + } + coordCfg.Tele = d.tele + + d.kad, err = coord.NewCoordinator(d.host.ID(), &Router{host: h}, d.rt, coordCfg) if err != nil { return nil, fmt.Errorf("new coordinator: %w", err) } @@ -303,6 +309,9 @@ func (d *DHT) logErr(err error, msg string) { // AddAddresses suggests peers and their associated addresses to be added to the routing table. // Addresses will be added to the peerstore with the supplied time to live. func (d *DHT) AddAddresses(ctx context.Context, ais []peer.AddrInfo, ttl time.Duration) error { + ctx, span := d.tele.Tracer.Start(ctx, "DHT.AddAddresses") + defer span.End() + return d.kad.AddNodes(ctx, ais, ttl) } diff --git a/v2/dht_test.go b/v2/dht_test.go index c47eda860..d0ad40748 100644 --- a/v2/dht_test.go +++ b/v2/dht_test.go @@ -95,19 +95,29 @@ func TestAddAddresses(t *testing.T) { ctx, cancel := kadtest.CtxShort(t) defer cancel() - local := newClientDht(t, nil) + localCfg := DefaultConfig() + localCfg.TracerProvider = kadtest.JaegerTracerProvider(t) + + local := newClientDht(t, localCfg) remote := newServerDht(t, nil) - // Populate and entries in remote's routing table so it responds to a connectivity check + // Populate entries in remote's routing table so it passes a connectivity check fillRoutingTable(t, remote, 1) - // TODO: add method to DHT to return its address? - pstore := remote.host.Peerstore() - remoteAddrInfo := pstore.PeerInfo(remote.host.ID()) + // local routing table should not contain the node + _, err := local.kad.GetNode(ctx, remote.host.ID()) + require.ErrorIs(t, err, coord.ErrNodeNotFound) + + remoteAddrInfo := peer.AddrInfo{ + ID: remote.host.ID(), + Addrs: remote.host.Addrs(), + } + require.NotEmpty(t, remoteAddrInfo.ID) + require.NotEmpty(t, remoteAddrInfo.Addrs) // Add remote's addresss to the local dht - err := local.AddAddresses(ctx, []peer.AddrInfo{remoteAddrInfo}, time.Minute) + err = local.AddAddresses(ctx, []peer.AddrInfo{remoteAddrInfo}, time.Minute) require.NoError(t, err) // the include state machine runs in the background and eventually should add the node to routing table diff --git a/v2/internal/kadtest/tracing.go b/v2/internal/kadtest/tracing.go new file mode 100644 index 000000000..526555c26 --- /dev/null +++ b/v2/internal/kadtest/tracing.go @@ -0,0 +1,28 @@ +package kadtest + +import ( + "fmt" + "testing" + + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/sdk/trace" +) + +// JaegerTracerProvider creates a tracer provider that exports traces to a Jaeger instance running +// on localhost on port 14268 +func JaegerTracerProvider(t *testing.T) *trace.TracerProvider { + t.Helper() + + traceHost := "127.0.0.1" + tracePort := 14268 + + endpoint := fmt.Sprintf("http://%s:%d/api/traces", traceHost, tracePort) + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(endpoint))) + if err != nil { + t.Fatalf("failed to create jaeger exporter: %v", err) + } + + tp := trace.NewTracerProvider(trace.WithBatcher(exp)) + + return tp +} diff --git a/v2/query_test.go b/v2/query_test.go index 996d6e833..29fa004a1 100644 --- a/v2/query_test.go +++ b/v2/query_test.go @@ -8,113 +8,6 @@ import ( "github.com/stretchr/testify/require" ) -// import ( -// "context" -// "fmt" -// "testing" -// "time" - -// "github.com/libp2p/go-libp2p" -// tu "github.com/libp2p/go-libp2p-testing/etc" -// "github.com/libp2p/go-libp2p/core/host" -// "github.com/libp2p/go-libp2p/core/peer" - -// "github.com/stretchr/testify/require" -// ) - -// TODO Debug test failures due to timing issue on windows -// Tests are timing dependent as can be seen in the 2 seconds timed context that we use in "tu.WaitFor". -// While the tests work fine on OSX and complete in under a second, -// they repeatedly fail to complete in the stipulated time on Windows. -// However, increasing the timeout makes them pass on Windows. - -// func TestRTEvictionOnFailedQuery(t *testing.T) { -// ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) -// defer cancel() - -// d1 := setupDHT(ctx, t, false) -// d2 := setupDHT(ctx, t, false) - -// for i := 0; i < 10; i++ { -// connect(t, ctx, d1, d2) -// for _, conn := range d1.host.Network().ConnsToPeer(d2.self) { -// conn.Close() -// } -// } - -// // peers should be in the RT because of fixLowPeers -// require.NoError(t, tu.WaitFor(ctx, func() error { -// if !checkRoutingTable(d1, d2) { -// return fmt.Errorf("should have routes") -// } -// return nil -// })) - -// // close both hosts so query fails -// require.NoError(t, d1.host.Close()) -// require.NoError(t, d2.host.Close()) -// // peers will still be in the RT because we have decoupled membership from connectivity -// require.NoError(t, tu.WaitFor(ctx, func() error { -// if !checkRoutingTable(d1, d2) { -// return fmt.Errorf("should have routes") -// } -// return nil -// })) - -// // failed queries should remove the peers from the RT -// _, err := d1.GetClosestPeers(ctx, "test") -// require.NoError(t, err) - -// _, err = d2.GetClosestPeers(ctx, "test") -// require.NoError(t, err) - -// require.NoError(t, tu.WaitFor(ctx, func() error { -// if checkRoutingTable(d1, d2) { -// return fmt.Errorf("should not have routes") -// } -// return nil -// })) -// } - -// func newPeerPair(t testing.TB) (host.Host, *DHT) { -// listenAddr := libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0") - -// client, err := libp2p.New(listenAddr) -// require.NoError(t, err) - -// server, err := libp2p.New(listenAddr) -// require.NoError(t, err) - -// cfg := DefaultConfig() -// cfg.Mode = ModeOptServer -// serverDHT, err := New(server, cfg) - -// fillRoutingTable(t, serverDHT) - -// t.Cleanup(func() { -// if err = serverDHT.Close(); err != nil { -// t.Logf("failed closing DHT: %s", err) -// } - -// if err = client.Close(); err != nil { -// t.Logf("failed closing client host: %s", err) -// } - -// if err = server.Close(); err != nil { -// t.Logf("failed closing client host: %s", err) -// } -// }) - -// ctx := context.Background() -// err = client.Connect(ctx, peer.AddrInfo{ -// ID: server.ID(), -// Addrs: server.Addrs(), -// }) -// require.NoError(t, err) - -// return client, serverDHT -// } - func newServerHost(t testing.TB) host.Host { listenAddr := libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0") @@ -146,6 +39,11 @@ func newClientHost(t testing.TB) host.Host { func newServerDht(t testing.TB, cfg *Config) *DHT { h := newServerHost(t) + if cfg == nil { + cfg = DefaultConfig() + } + cfg.Mode = ModeOptServer + d, err := New(h, cfg) require.NoError(t, err) @@ -160,6 +58,10 @@ func newServerDht(t testing.TB, cfg *Config) *DHT { func newClientDht(t testing.TB, cfg *Config) *DHT { h := newClientHost(t) + if cfg == nil { + cfg = DefaultConfig() + } + cfg.Mode = ModeOptClient d, err := New(h, cfg) require.NoError(t, err) @@ -170,119 +72,3 @@ func newClientDht(t testing.TB, cfg *Config) *DHT { }) return d } - -// func newTestDHTForHostWithConfig(t testing.TB, h host.Host, cfg *Config) *DHT { -// t.Helper() - -// h, err := libp2p.New(libp2p.NoListenAddrs) -// require.NoError(t, err) - -// d, err := New(h, cfg) -// require.NoError(t, err) - -// t.Cleanup(func() { -// if err = d.Close(); err != nil { -// t.Logf("closing dht: %s", err) -// } - -// if err = h.Close(); err != nil { -// t.Logf("closing host: %s", err) -// } -// }) - -// return d -// } - -// func connect(t *testing.T, ctx context.Context, a, b *DHT) { -// t.Helper() -// connectNoSync(t, ctx, a, b) -// wait(t, ctx, a, b) -// wait(t, ctx, b, a) -// } - -// func wait(t *testing.T, ctx context.Context, a, b *DHT) { -// t.Helper() - -// // loop until connection notification has been received. -// // under high load, this may not happen as immediately as we would like. -// for a.routingTable.Find(b.self) == "" { -// select { -// case <-ctx.Done(): -// t.Fatal(ctx.Err()) -// case <-time.After(time.Millisecond * 5): -// } -// } -// } - -// func connectNoSync(t *testing.T, ctx context.Context, a, b *DHT) { -// t.Helper() - -// idB := b.self -// addrB := b.peerstore.Addrs(idB) -// if len(addrB) == 0 { -// t.Fatal("peers setup incorrectly: no local address") -// } - -// if err := a.host.Connect(ctx, peer.AddrInfo{ID: idB, Addrs: addrB}); err != nil { -// t.Fatal(err) -// } -// } - -// func TestRTAdditionOnSuccessfulQuery(t *testing.T) { -// ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) -// defer cancel() - -// // d1 := setupDHT(ctx, t, false) -// // d2 := setupDHT(ctx, t, false) -// // d3 := setupDHT(ctx, t, false) - -// d1 := newTestDHT(t) -// defer d1.Close() -// d2 := newTestDHT(t) -// defer d2.Close() -// d3 := newTestDHT(t) -// defer d3.Close() - -// connect(t, ctx, d1, d2) -// connect(t, ctx, d2, d3) -// // validate RT states - -// // d1 has d2 -// require.NoError(t, tu.WaitFor(ctx, func() error { -// if !checkRoutingTable(d1, d2) { -// return fmt.Errorf("should have routes") -// } -// return nil -// })) -// // d2 has d3 -// require.NoError(t, tu.WaitFor(ctx, func() error { -// if !checkRoutingTable(d2, d3) { -// return fmt.Errorf("should have routes") -// } -// return nil -// })) - -// // however, d1 does not know about d3 -// require.NoError(t, tu.WaitFor(ctx, func() error { -// if checkRoutingTable(d1, d3) { -// return fmt.Errorf("should not have routes") -// } -// return nil -// })) - -// // but when d3 queries d2, d1 and d3 discover each other -// _, err := d3.GetClosestPeers(ctx, "something") -// require.NoError(t, err) -// require.NoError(t, tu.WaitFor(ctx, func() error { -// if !checkRoutingTable(d1, d3) { -// return fmt.Errorf("should have routes") -// } -// return nil -// })) -// } - -// func checkRoutingTable(a, b *IpfsDHT) bool { -// // loop until connection notification has been received. -// // under high load, this may not happen as immediately as we would like. -// return a.routingTable.Find(b.self) != "" && b.routingTable.Find(a.self) != "" -// } From 82dc09bec65136792037841e00f045f6f1d5569e Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 7 Sep 2023 10:13:04 +0100 Subject: [PATCH 3/7] go mod tidy --- v2/go.mod | 1 + v2/go.sum | 3 +++ 2 files changed, 4 insertions(+) diff --git a/v2/go.mod b/v2/go.mod index 85b481ae7..1473c15bb 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -18,6 +18,7 @@ require ( github.com/plprobelab/go-kademlia v0.0.0-20230901130940-286ab4ceca60 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/otel v1.17.0 + go.opentelemetry.io/otel/exporters/jaeger v1.16.0 go.opentelemetry.io/otel/metric v1.17.0 go.opentelemetry.io/otel/sdk v1.17.0 go.opentelemetry.io/otel/sdk/metric v0.40.0 diff --git a/v2/go.sum b/v2/go.sum index d392ea5d0..bc586fb55 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -325,6 +325,7 @@ github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -347,6 +348,8 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opentelemetry.io/otel v1.17.0 h1:MW+phZ6WZ5/uk2nd93ANk/6yJ+dVrvNWUjGhnnFU5jM= go.opentelemetry.io/otel v1.17.0/go.mod h1:I2vmBGtFaODIVMBSTPVDlJSzBDNf93k60E6Ft0nyjo0= +go.opentelemetry.io/otel/exporters/jaeger v1.16.0 h1:YhxxmXZ011C0aDZKoNw+juVWAmEfv/0W2XBOv9aHTaA= +go.opentelemetry.io/otel/exporters/jaeger v1.16.0/go.mod h1:grYbBo/5afWlPpdPZYhyn78Bk04hnvxn2+hvxQhKIQM= go.opentelemetry.io/otel/metric v1.17.0 h1:iG6LGVz5Gh+IuO0jmgvpTB6YVrCGngi8QGm+pMd8Pdc= go.opentelemetry.io/otel/metric v1.17.0/go.mod h1:h4skoxdZI17AxwITdmdZjjYJQH5nzijUUjm+wtPph5o= go.opentelemetry.io/otel/sdk v1.17.0 h1:FLN2X66Ke/k5Sg3V623Q7h7nt3cHXaW1FOvKKrW0IpE= From 2c817b3b8509e7b4a793654f701850fa4350410d Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 7 Sep 2023 10:29:32 +0100 Subject: [PATCH 4/7] Rename Query Skip errors --- v2/coord/coordinator.go | 4 ++-- v2/coord/coretypes.go | 8 ++++---- v2/routing.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/v2/coord/coordinator.go b/v2/coord/coordinator.go index 8d358d75a..6c5ecddf8 100644 --- a/v2/coord/coordinator.go +++ b/v2/coord/coordinator.go @@ -388,12 +388,12 @@ func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (Q } err = fn(ctx, nh, lastStats) - if errors.Is(err, SkipRemaining) { + if errors.Is(err, ErrSkipRemaining) { // done c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID}) return lastStats, nil } - if errors.Is(err, SkipNode) { + if errors.Is(err, ErrSkipNode) { // TODO: don't add closer nodes from this node break } diff --git a/v2/coord/coretypes.go b/v2/coord/coretypes.go index 660b503ac..fe72d90fc 100644 --- a/v2/coord/coretypes.go +++ b/v2/coord/coretypes.go @@ -70,11 +70,11 @@ type QueryStats struct { } var ( - // SkipNode is used as a return value from a QueryFunc to indicate that the node is to be skipped. - SkipNode = errors.New("skip node") //nolint:all + // ErrSkipNode is used as a return value from a QueryFunc to indicate that the node is to be skipped. + ErrSkipNode = errors.New("skip node") - // SkipRemaining is used as a return value a QueryFunc to indicate that all remaining nodes are to be skipped. - SkipRemaining = errors.New("skip remaining nodes") //nolint:all + // ErrSkipRemaining is used as a return value a QueryFunc to indicate that all remaining nodes are to be skipped. + ErrSkipRemaining = errors.New("skip remaining nodes") ) // Router its a work in progress diff --git a/v2/routing.go b/v2/routing.go index eacc9d8da..cc72f849a 100644 --- a/v2/routing.go +++ b/v2/routing.go @@ -47,7 +47,7 @@ func (d *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { slog.Info("visiting node", "id", node.ID()) if node.ID() == id { foundNode = node - return coord.SkipRemaining + return coord.ErrSkipRemaining } return nil } From 04482b7a5ddba2e5d5e310a919cddb9a9537b285 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 7 Sep 2023 10:30:53 +0100 Subject: [PATCH 5/7] go fmt coordinator.go --- v2/coord/coordinator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/coord/coordinator.go b/v2/coord/coordinator.go index 6c5ecddf8..045331b81 100644 --- a/v2/coord/coordinator.go +++ b/v2/coord/coordinator.go @@ -118,7 +118,7 @@ func (cfg *CoordinatorConfig) Validate() error { } } - return nil; + return nil } func DefaultCoordinatorConfig() (*CoordinatorConfig, error) { From a2544e8b58acdd80f2f857a6fb6a534c8a2436a5 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 7 Sep 2023 16:55:15 +0100 Subject: [PATCH 6/7] Fix test flakes --- v2/coord/behaviour.go | 4 +- v2/coord/coordinator.go | 4 +- v2/coord/coordinator_test.go | 133 ++++++++++++++++++--------- v2/coord/internal/nettest/layouts.go | 2 +- v2/coord/network.go | 26 ++++-- v2/coord/network_test.go | 3 +- v2/coord/routing.go | 2 +- v2/dht_test.go | 1 - v2/internal/kadtest/tracing.go | 5 + 9 files changed, 122 insertions(+), 58 deletions(-) diff --git a/v2/coord/behaviour.go b/v2/coord/behaviour.go index 2f9f9e33f..aa69917f0 100644 --- a/v2/coord/behaviour.go +++ b/v2/coord/behaviour.go @@ -56,7 +56,7 @@ type WorkQueue[E BehaviourEvent] struct { func NewWorkQueue[E BehaviourEvent](fn WorkQueueFunc[E]) *WorkQueue[E] { w := &WorkQueue[E]{ - pending: make(chan pendingEvent[E], 16), + pending: make(chan pendingEvent[E], 1), fn: fn, } return w @@ -113,7 +113,7 @@ var _ Notify[BehaviourEvent] = (*Waiter[BehaviourEvent])(nil) func NewWaiter[E BehaviourEvent]() *Waiter[E] { w := &Waiter[E]{ - pending: make(chan WaiterEvent[E], 16), + pending: make(chan WaiterEvent[E], 1), } return w } diff --git a/v2/coord/coordinator.go b/v2/coord/coordinator.go index 045331b81..579701a4f 100644 --- a/v2/coord/coordinator.go +++ b/v2/coord/coordinator.go @@ -201,7 +201,7 @@ func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey, routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, probe, cfg.Logger, cfg.Tele.Tracer) - networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger) + networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, cfg.Tele.Tracer) ctx, cancel := context.WithCancel(context.Background()) @@ -352,7 +352,7 @@ func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (Q } waiter := NewWaiter[BehaviourEvent]() - queryID := query.QueryID("foo") + queryID := query.QueryID("foo") // TODO: choose query ID cmd := &EventStartQuery{ QueryID: queryID, diff --git a/v2/coord/coordinator_test.go b/v2/coord/coordinator_test.go index 7908674d4..87f73cabc 100644 --- a/v2/coord/coordinator_test.go +++ b/v2/coord/coordinator_test.go @@ -5,36 +5,85 @@ import ( "fmt" "log" "reflect" + "sync" "testing" "time" "github.com/benbjohnson/clock" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "github.com/libp2p/go-libp2p-kad-dht/v2/coord/internal/nettest" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" + "github.com/libp2p/go-libp2p-kad-dht/v2/tele" ) const peerstoreTTL = 10 * time.Minute -// expectEventType selects on the event channel until an event of the expected type is sent. -func expectEventType(t *testing.T, ctx context.Context, events <-chan RoutingNotification, expected RoutingNotification) (RoutingNotification, error) { +type notificationWatcher struct { + mu sync.Mutex + buffered []RoutingNotification + signal chan struct{} +} + +func (w *notificationWatcher) Watch(t *testing.T, ctx context.Context, ch <-chan RoutingNotification) { t.Helper() + w.signal = make(chan struct{}, 1) + go func() { + for { + select { + case <-ctx.Done(): + return + case ev := <-ch: + w.mu.Lock() + t.Logf("buffered routing notification: %T\n", ev) + w.buffered = append(w.buffered, ev) + select { + case w.signal <- struct{}{}: + default: + } + w.mu.Unlock() + + } + } + }() +} + +func (w *notificationWatcher) Expect(ctx context.Context, expected RoutingNotification) (RoutingNotification, error) { for { - select { - case ev := <-events: - t.Logf("saw event: %T\n", ev) + // look in buffered events + w.mu.Lock() + for i, ev := range w.buffered { if reflect.TypeOf(ev) == reflect.TypeOf(expected) { + // remove first from buffer and return it + w.buffered = w.buffered[:i+copy(w.buffered[i:], w.buffered[i+1:])] + w.mu.Unlock() return ev, nil } + } + w.mu.Unlock() + + // wait to be signaled that there is a new event + select { case <-ctx.Done(): return nil, fmt.Errorf("test deadline exceeded while waiting for event %T", expected) + case <-w.signal: } } } +// tracingTelemetry may be used to create a Telemetry that traces a test +func tracingTelemetry(t *testing.T) *tele.Telemetry { + telemetry, err := tele.New(otel.GetMeterProvider(), kadtest.JaegerTracerProvider(t)) + if err != nil { + t.Fatalf("unexpected error creating telemetry: %v", err) + } + + return telemetry +} + func TestConfigValidate(t *testing.T) { t.Run("default is valid", func(t *testing.T) { cfg, err := DefaultCoordinatorConfig() @@ -171,17 +220,8 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { log.Fatalf("unexpected error creating coordinator: %v", err) } - buffer := make(chan RoutingNotification, 5) - go func() { - for { - select { - case <-ctx.Done(): - return - case ev := <-c.RoutingNotifications(): - buffer <- ev - } - } - }() + w := new(notificationWatcher) + w.Watch(t, ctx, c.RoutingNotifications()) qfn := func(ctx context.Context, node Node, stats QueryStats) error { return nil @@ -195,22 +235,29 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { // the query run by the dht should have received a response from nodes[1] with closer nodes // nodes[0] and nodes[2] which should trigger a routing table update since nodes[2] was // not in the dht's routing table. - ev, err := expectEventType(t, ctx, buffer, &EventRoutingUpdated{}) - require.NoError(t, err) - - tev := ev.(*EventRoutingUpdated) - require.Equal(t, nodes[2].NodeInfo.ID, tev.NodeInfo.ID) + // the query then continues and should have received a response from nodes[2] with closer nodes + // nodes[1] and nodes[3] which should trigger a routing table update since nodes[3] was + // not in the dht's routing table. // no EventRoutingUpdated is sent for the self node - // the query continues and should have received a response from nodes[2] with closer nodes - // nodes[1] and nodes[3] which should trigger a routing table update since nodes[3] was - // not in the dht's routing table. - ev, err = expectEventType(t, ctx, buffer, &EventRoutingUpdated{}) + // However the order in which these events are emitted may vary depending on timing. + + ev1, err := w.Expect(ctx, &EventRoutingUpdated{}) require.NoError(t, err) + tev1 := ev1.(*EventRoutingUpdated) - tev = ev.(*EventRoutingUpdated) - require.Equal(t, nodes[3].NodeInfo.ID, tev.NodeInfo.ID) + ev2, err := w.Expect(ctx, &EventRoutingUpdated{}) + require.NoError(t, err) + tev2 := ev2.(*EventRoutingUpdated) + + if tev1.NodeInfo.ID == nodes[2].NodeInfo.ID { + require.Equal(t, nodes[3].NodeInfo.ID, tev2.NodeInfo.ID) + } else if tev2.NodeInfo.ID == nodes[2].NodeInfo.ID { + require.Equal(t, nodes[3].NodeInfo.ID, tev1.NodeInfo.ID) + } else { + require.Failf(t, "did not see routing updated event for %s", nodes[2].NodeInfo.ID.String()) + } } func TestBootstrap(t *testing.T) { @@ -231,24 +278,15 @@ func TestBootstrap(t *testing.T) { d, err := NewCoordinator(self, nodes[0].Router, nodes[0].RoutingTable, ccfg) require.NoError(t, err) - buffer := make(chan RoutingNotification, 5) - go func() { - for { - select { - case <-ctx.Done(): - return - case ev := <-d.RoutingNotifications(): - buffer <- ev - } - } - }() + w := new(notificationWatcher) + w.Watch(t, ctx, d.RoutingNotifications()) seeds := []peer.ID{nodes[1].NodeInfo.ID} err = d.Bootstrap(ctx, seeds) require.NoError(t, err) // the query run by the dht should have completed - ev, err := expectEventType(t, ctx, buffer, &EventBootstrapFinished{}) + ev, err := w.Expect(ctx, &EventBootstrapFinished{}) require.NoError(t, err) require.IsType(t, &EventBootstrapFinished{}, ev) @@ -257,15 +295,21 @@ func TestBootstrap(t *testing.T) { require.Equal(t, 3, tevf.Stats.Success) require.Equal(t, 0, tevf.Stats.Failure) - // DHT should now have node1 in its routing table + _, err = w.Expect(ctx, &EventRoutingUpdated{}) + require.NoError(t, err) + + _, err = w.Expect(ctx, &EventRoutingUpdated{}) + require.NoError(t, err) + + // coordinator will have node1 in its routing table _, err = d.GetNode(ctx, nodes[1].NodeInfo.ID) require.NoError(t, err) - // DHT should now have node2 in its routing table + // coordinator should now have node2 in its routing table _, err = d.GetNode(ctx, nodes[2].NodeInfo.ID) require.NoError(t, err) - // DHT should now have node3 in its routing table + // coordinator should now have node3 in its routing table _, err = d.GetNode(ctx, nodes[3].NodeInfo.ID) require.NoError(t, err) } @@ -296,14 +340,15 @@ func TestIncludeNode(t *testing.T) { _, err = d.GetNode(ctx, candidate.ID) require.ErrorIs(t, err, ErrNodeNotFound) - events := d.RoutingNotifications() + w := new(notificationWatcher) + w.Watch(t, ctx, d.RoutingNotifications()) // inject a new node into the dht's includeEvents queue err = d.AddNodes(ctx, []peer.AddrInfo{candidate}, time.Minute) require.NoError(t, err) // the include state machine runs in the background and eventually should add the node to routing table - ev, err := expectEventType(t, ctx, events, &EventRoutingUpdated{}) + ev, err := w.Expect(ctx, &EventRoutingUpdated{}) require.NoError(t, err) tev := ev.(*EventRoutingUpdated) diff --git a/v2/coord/internal/nettest/layouts.go b/v2/coord/internal/nettest/layouts.go index a49a94bb3..f5236dc1f 100644 --- a/v2/coord/internal/nettest/layouts.go +++ b/v2/coord/internal/nettest/layouts.go @@ -38,7 +38,7 @@ func LinearTopology(n int, clk clock.Clock) (*Topology, []*Node, error) { nodes[i] = &Node{ NodeInfo: ai, Router: NewRouter(ai.ID, top), - RoutingTable: simplert.New[key.Key256, kad.NodeID[key.Key256]](kadt.PeerID(ai.ID), 2), + RoutingTable: simplert.New[key.Key256, kad.NodeID[key.Key256]](kadt.PeerID(ai.ID), 20), } } diff --git a/v2/coord/network.go b/v2/coord/network.go index bf241b5d1..4e492f001 100644 --- a/v2/coord/network.go +++ b/v2/coord/network.go @@ -10,6 +10,7 @@ import ( "github.com/plprobelab/go-kademlia/kad" "github.com/plprobelab/go-kademlia/key" "github.com/plprobelab/go-kademlia/query" + "go.opentelemetry.io/otel/trace" "golang.org/x/exp/slog" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" @@ -27,20 +28,25 @@ type NetworkBehaviour struct { ready chan struct{} logger *slog.Logger + tracer trace.Tracer } -func NewNetworkBehaviour(rtr Router, logger *slog.Logger) *NetworkBehaviour { +func NewNetworkBehaviour(rtr Router, logger *slog.Logger, tracer trace.Tracer) *NetworkBehaviour { b := &NetworkBehaviour{ rtr: rtr, nodeHandlers: make(map[peer.ID]*NodeHandler), ready: make(chan struct{}, 1), logger: logger, + tracer: tracer, } return b } func (b *NetworkBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { + ctx, span := b.tracer.Start(ctx, "NetworkBehaviour.Notify") + defer span.End() + b.pendingMu.Lock() defer b.pendingMu.Unlock() @@ -49,7 +55,7 @@ func (b *NetworkBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { b.nodeHandlersMu.Lock() nh, ok := b.nodeHandlers[ev.To.ID] if !ok { - nh = NewNodeHandler(ev.To, b.rtr, b.logger) + nh = NewNodeHandler(ev.To, b.rtr, b.logger, b.tracer) b.nodeHandlers[ev.To.ID] = nh } b.nodeHandlersMu.Unlock() @@ -71,6 +77,8 @@ func (b *NetworkBehaviour) Ready() <-chan struct{} { } func (b *NetworkBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool) { + ctx, span := b.tracer.Start(ctx, "NetworkBehaviour.Perform") + defer span.End() // No inbound work can be done until Perform is complete b.pendingMu.Lock() defer b.pendingMu.Unlock() @@ -100,7 +108,7 @@ func (b *NetworkBehaviour) getNodeHandler(ctx context.Context, id peer.ID) (*Nod if err != nil { return nil, err } - nh = NewNodeHandler(info, b.rtr, b.logger) + nh = NewNodeHandler(info, b.rtr, b.logger, b.tracer) b.nodeHandlers[id] = nh } b.nodeHandlersMu.Unlock() @@ -112,13 +120,15 @@ type NodeHandler struct { rtr Router queue *WorkQueue[NodeHandlerRequest] logger *slog.Logger + tracer trace.Tracer } -func NewNodeHandler(self peer.AddrInfo, rtr Router, logger *slog.Logger) *NodeHandler { +func NewNodeHandler(self peer.AddrInfo, rtr Router, logger *slog.Logger, tracer trace.Tracer) *NodeHandler { h := &NodeHandler{ self: self, rtr: rtr, logger: logger, + tracer: tracer, } h.queue = NewWorkQueue(h.send) @@ -127,6 +137,8 @@ func NewNodeHandler(self peer.AddrInfo, rtr Router, logger *slog.Logger) *NodeHa } func (h *NodeHandler) Notify(ctx context.Context, ev NodeHandlerRequest) { + ctx, span := h.tracer.Start(ctx, "NodeHandler.Notify") + defer span.End() h.queue.Enqueue(ctx, ev) } @@ -142,7 +154,7 @@ func (h *NodeHandler) send(ctx context.Context, ev NodeHandlerRequest) bool { QueryID: cmd.QueryID, To: h.self, Target: cmd.Target, - Err: fmt.Errorf("send: %w", err), + Err: fmt.Errorf("NodeHandler: %w", err), }) return false } @@ -171,6 +183,8 @@ func (h *NodeHandler) Addresses() []ma.Multiaddr { // GetClosestNodes requests the n closest nodes to the key from the node's local routing table. // The node may return fewer nodes than requested. func (h *NodeHandler) GetClosestNodes(ctx context.Context, k KadKey, n int) ([]Node, error) { + ctx, span := h.tracer.Start(ctx, "NodeHandler.GetClosestNodes") + defer span.End() w := NewWaiter[BehaviourEvent]() ev := &EventOutboundGetCloserNodes{ @@ -192,7 +206,7 @@ func (h *NodeHandler) GetClosestNodes(ctx context.Context, k KadKey, n int) ([]N nodes := make([]Node, 0, len(res.CloserNodes)) for _, info := range res.CloserNodes { // TODO use a global registry of node handlers - nodes = append(nodes, NewNodeHandler(info, h.rtr, h.logger)) + nodes = append(nodes, NewNodeHandler(info, h.rtr, h.logger, h.tracer)) n-- if n == 0 { break diff --git a/v2/coord/network_test.go b/v2/coord/network_test.go index cbba0f590..ad0f3146f 100644 --- a/v2/coord/network_test.go +++ b/v2/coord/network_test.go @@ -5,6 +5,7 @@ import ( "github.com/benbjohnson/clock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" "golang.org/x/exp/slog" "github.com/libp2p/go-libp2p-kad-dht/v2/coord/internal/nettest" @@ -21,7 +22,7 @@ func TestGetClosestNodes(t *testing.T) { _, nodes, err := nettest.LinearTopology(4, clk) require.NoError(t, err) - h := NewNodeHandler(nodes[1].NodeInfo, nodes[1].Router, slog.Default()) + h := NewNodeHandler(nodes[1].NodeInfo, nodes[1].Router, slog.Default(), trace.NewNoopTracerProvider().Tracer("")) // node 1 has node 2 in its routing table so it will return it along with node 0 found, err := h.GetClosestNodes(ctx, kadt.PeerID(nodes[2].NodeInfo.ID).Key(), 2) diff --git a/v2/coord/routing.go b/v2/coord/routing.go index 4808c18ec..488ac6898 100644 --- a/v2/coord/routing.go +++ b/v2/coord/routing.go @@ -314,7 +314,7 @@ func (r *RoutingBehaviour) advanceInclude(ctx context.Context, ev routing.Includ case *routing.StateIncludeWaitingFull: // nothing to do except wait for message response or timeout case *routing.StateIncludeIdle: - // nothing to do except wait for message response or timeout + // nothing to do except wait for new nodes to be added to queue default: panic(fmt.Sprintf("unexpected include state: %T", st)) } diff --git a/v2/dht_test.go b/v2/dht_test.go index d0ad40748..b42b77b5d 100644 --- a/v2/dht_test.go +++ b/v2/dht_test.go @@ -96,7 +96,6 @@ func TestAddAddresses(t *testing.T) { defer cancel() localCfg := DefaultConfig() - localCfg.TracerProvider = kadtest.JaegerTracerProvider(t) local := newClientDht(t, localCfg) diff --git a/v2/internal/kadtest/tracing.go b/v2/internal/kadtest/tracing.go index 526555c26..dc7c82c80 100644 --- a/v2/internal/kadtest/tracing.go +++ b/v2/internal/kadtest/tracing.go @@ -1,6 +1,7 @@ package kadtest import ( + "context" "fmt" "testing" @@ -24,5 +25,9 @@ func JaegerTracerProvider(t *testing.T) *trace.TracerProvider { tp := trace.NewTracerProvider(trace.WithBatcher(exp)) + t.Cleanup(func() { + tp.Shutdown(context.Background()) + }) + return tp } From 2491232ff414ab69dcf75e5eb4c1704a5a95c361 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 7 Sep 2023 16:59:35 +0100 Subject: [PATCH 7/7] Fix lint errors --- v2/coord/coordinator_test.go | 4 ++-- v2/coord/network.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/v2/coord/coordinator_test.go b/v2/coord/coordinator_test.go index 87f73cabc..235828ccb 100644 --- a/v2/coord/coordinator_test.go +++ b/v2/coord/coordinator_test.go @@ -74,8 +74,8 @@ func (w *notificationWatcher) Expect(ctx context.Context, expected RoutingNotifi } } -// tracingTelemetry may be used to create a Telemetry that traces a test -func tracingTelemetry(t *testing.T) *tele.Telemetry { +// TracingTelemetry may be used to create a Telemetry that traces a test +func TracingTelemetry(t *testing.T) *tele.Telemetry { telemetry, err := tele.New(otel.GetMeterProvider(), kadtest.JaegerTracerProvider(t)) if err != nil { t.Fatalf("unexpected error creating telemetry: %v", err) diff --git a/v2/coord/network.go b/v2/coord/network.go index 4e492f001..eeb054026 100644 --- a/v2/coord/network.go +++ b/v2/coord/network.go @@ -77,7 +77,7 @@ func (b *NetworkBehaviour) Ready() <-chan struct{} { } func (b *NetworkBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool) { - ctx, span := b.tracer.Start(ctx, "NetworkBehaviour.Perform") + _, span := b.tracer.Start(ctx, "NetworkBehaviour.Perform") defer span.End() // No inbound work can be done until Perform is complete b.pendingMu.Lock()