From 06155c535c1a44624988687396d2ee91c11a914e Mon Sep 17 00:00:00 2001 From: Daniel N <2color@users.noreply.github.com> Date: Thu, 15 Aug 2024 12:25:44 +0200 Subject: [PATCH 01/13] feat: add check using only a CID Fixes #6 --- daemon.go | 109 ++++++++++++++++++++++++++++++++++++++++-------------- main.go | 19 +++++++++- 2 files changed, 100 insertions(+), 28 deletions(-) diff --git a/daemon.go b/daemon.go index c922595..2a5a525 100644 --- a/daemon.go +++ b/daemon.go @@ -2,10 +2,8 @@ package main import ( "context" - "errors" "fmt" "log" - "net/url" "sync" "time" @@ -38,6 +36,10 @@ type daemon struct { createTestHost func() (host.Host, error) } +// number of providers at which to stop looking for providers in the DHT +// When doing a check only with a CID +var MaxProvidersCount = 3 + func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) { rm, err := NewResourceManager() if err != nil { @@ -108,18 +110,79 @@ func (d *daemon) mustStart() { } -func (d *daemon) runCheck(query url.Values) (*output, error) { - maStr := query.Get("multiaddr") - cidStr := query.Get("cid") +type providerOutput struct { + ID string + Addrs []string + ConnectionMaddrs []string + BitswapCheckOutput BitswapCheckOutput +} - if maStr == "" { - return nil, errors.New("missing 'multiaddr' argument") +func (d *daemon) runCidCheck(cidStr string) (*[]providerOutput, error) { + cid, err := cid.Decode(cidStr) + if err != nil { + return nil, err } - if cidStr == "" { - return nil, errors.New("missing 'cid' argument") + ctx := context.Background() + out := make([]providerOutput, 0, 3) + + queryCtx, cancel := context.WithCancel(ctx) + defer cancel() + provsCh := d.dht.FindProvidersAsync(queryCtx, cid, MaxProvidersCount) + + for provider := range provsCh { + addrs := make([]string, len(provider.Addrs)) + for i, addr := range provider.Addrs { + addrs[i] = addr.String() + } + + provOutput := providerOutput{ + ID: provider.ID.String(), + Addrs: addrs, + BitswapCheckOutput: BitswapCheckOutput{}, + } + + testHost, err := d.createTestHost() + if err != nil { + return nil, fmt.Errorf("server error: %w", err) + } + defer testHost.Close() + + // Test Is the target connectable + dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15) + + // we call NewStream to force NAT hole punching + // See https://github.com/libp2p/go-libp2p/issues/2714 + testHost.Connect(dialCtx, provider) + _, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap") + dialCancel() + + if connErr != nil { + provOutput.BitswapCheckOutput.Error = fmt.Sprintf("error dialing to peer: %s", connErr.Error()) + } else { + // TODO: Modify checkBitswapCID and vole to accept `AddrInfo` so that it can test any of the connections + provOutput.BitswapCheckOutput = checkBitswapCID(ctx, testHost, cid, provider.Addrs[0]) + + for _, c := range testHost.Network().ConnsToPeer(provider.ID) { + provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String()) + } + } + + out = append(out, provOutput) } + return &out, nil +} + +type peerCheckOutput struct { + ConnectionError string + PeerFoundInDHT map[string]int + CidInDHT bool + ConnectionMaddrs []string + DataAvailableOverBitswap BitswapCheckOutput +} + +func (d *daemon) runPeerCheck(maStr, cidStr string) (*peerCheckOutput, error) { ma, err := multiaddr.NewMultiaddr(maStr) if err != nil { return nil, err @@ -139,11 +202,11 @@ func (d *daemon) runCheck(query url.Values) (*output, error) { } ctx := context.Background() - out := &output{} + out := &peerCheckOutput{} connectionFailed := false - out.CidInDHT = providerRecordInDHT(ctx, d.dht, c, ai.ID) + out.CidInDHT = providerRecordForPeerInDHT(ctx, d.dht, c, ai.ID) addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID) out.PeerFoundInDHT = addrMap @@ -202,6 +265,13 @@ func (d *daemon) runCheck(query url.Values) (*output, error) { return out, nil } +type BitswapCheckOutput struct { + Duration time.Duration + Found bool + Responded bool + Error string +} + func checkBitswapCID(ctx context.Context, host host.Host, c cid.Cid, ma multiaddr.Multiaddr) BitswapCheckOutput { log.Printf("Start of Bitswap check for cid %s by attempting to connect to ma: %v with the temporary peer: %s", c, ma, host.ID()) out := BitswapCheckOutput{} @@ -223,21 +293,6 @@ func checkBitswapCID(ctx context.Context, host host.Host, c cid.Cid, ma multiadd return out } -type BitswapCheckOutput struct { - Duration time.Duration - Found bool - Responded bool - Error string -} - -type output struct { - ConnectionError string - PeerFoundInDHT map[string]int - CidInDHT bool - ConnectionMaddrs []string - DataAvailableOverBitswap BitswapCheckOutput -} - func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMessenger, p peer.ID) (map[string]int, error) { closestPeers, err := d.GetClosestPeers(ctx, string(p)) if err != nil { @@ -281,7 +336,7 @@ func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMe return addrMap, nil } -func providerRecordInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool { +func providerRecordForPeerInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool { queryCtx, cancel := context.WithCancel(ctx) defer cancel() provsCh := d.FindProvidersAsync(queryCtx, c, 0) diff --git a/main.go b/main.go index c330cb0..1f85045 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "crypto/subtle" "encoding/json" + "errors" "log" "net" "net/http" @@ -77,7 +78,23 @@ func startServer(ctx context.Context, d *daemon, tcpListener, metricsUsername, m checkHandler := func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Access-Control-Allow-Origin", "*") - data, err := d.runCheck(r.URL.Query()) + + maStr := r.URL.Query().Get("multiaddr") + cidStr := r.URL.Query().Get("cid") + + if cidStr == "" { + err = errors.New("missing 'cid' argument") + } + + var err error + var data interface{} + + if maStr == "" { + data, err = d.runCidCheck(cidStr) + } else { + data, err = d.runPeerCheck(maStr, cidStr) + } + if err == nil { w.Header().Add("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(data) From cdb4edc9d2bb25e01acb3c767bb11991d75e96b9 Mon Sep 17 00:00:00 2001 From: Daniel N <2color@users.noreply.github.com> Date: Mon, 26 Aug 2024 16:47:42 +0200 Subject: [PATCH 02/13] feat: run checks concurrently --- daemon.go | 74 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/daemon.go b/daemon.go index 2a5a525..a0c6758 100644 --- a/daemon.go +++ b/daemon.go @@ -124,53 +124,65 @@ func (d *daemon) runCidCheck(cidStr string) (*[]providerOutput, error) { } ctx := context.Background() - out := make([]providerOutput, 0, 3) + out := make([]providerOutput, 0, MaxProvidersCount) queryCtx, cancel := context.WithCancel(ctx) defer cancel() provsCh := d.dht.FindProvidersAsync(queryCtx, cid, MaxProvidersCount) + var wg sync.WaitGroup + var mu sync.Mutex + for provider := range provsCh { - addrs := make([]string, len(provider.Addrs)) - for i, addr := range provider.Addrs { - addrs[i] = addr.String() - } + wg.Add(1) + go func(provider peer.AddrInfo) { + defer wg.Done() - provOutput := providerOutput{ - ID: provider.ID.String(), - Addrs: addrs, - BitswapCheckOutput: BitswapCheckOutput{}, - } + addrs := make([]string, len(provider.Addrs)) + for i, addr := range provider.Addrs { + addrs[i] = addr.String() + } - testHost, err := d.createTestHost() - if err != nil { - return nil, fmt.Errorf("server error: %w", err) - } - defer testHost.Close() + provOutput := providerOutput{ + ID: provider.ID.String(), + Addrs: addrs, + BitswapCheckOutput: BitswapCheckOutput{}, + } - // Test Is the target connectable - dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15) + testHost, err := d.createTestHost() + if err != nil { + log.Printf("Error creating test host: %v", err) + return + } + defer testHost.Close() - // we call NewStream to force NAT hole punching - // See https://github.com/libp2p/go-libp2p/issues/2714 - testHost.Connect(dialCtx, provider) - _, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap") - dialCancel() + // Test Is the target connectable + dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15) + defer dialCancel() - if connErr != nil { - provOutput.BitswapCheckOutput.Error = fmt.Sprintf("error dialing to peer: %s", connErr.Error()) - } else { - // TODO: Modify checkBitswapCID and vole to accept `AddrInfo` so that it can test any of the connections - provOutput.BitswapCheckOutput = checkBitswapCID(ctx, testHost, cid, provider.Addrs[0]) + // we call NewStream to force NAT hole punching + // See https://github.com/libp2p/go-libp2p/issues/2714 + testHost.Connect(dialCtx, provider) + _, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap") - for _, c := range testHost.Network().ConnsToPeer(provider.ID) { - provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String()) + if connErr != nil { + provOutput.BitswapCheckOutput.Error = fmt.Sprintf("error dialing to peer: %s", connErr.Error()) + } else { + // TODO: Modify checkBitswapCID and vole to accept `AddrInfo` so that it can test any of the connections + provOutput.BitswapCheckOutput = checkBitswapCID(ctx, testHost, cid, provider.Addrs[0]) + + for _, c := range testHost.Network().ConnsToPeer(provider.ID) { + provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String()) + } } - } - out = append(out, provOutput) + mu.Lock() + out = append(out, provOutput) + mu.Unlock() + }(provider) } + wg.Wait() return &out, nil } From 99c8ffd4b3ea364a8a2f65db4608d8dafad3363b Mon Sep 17 00:00:00 2001 From: Daniel N <2color@users.noreply.github.com> Date: Mon, 26 Aug 2024 21:44:43 +0200 Subject: [PATCH 03/13] fix: use idiomatic approach with ctx termination --- daemon.go | 89 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/daemon.go b/daemon.go index a0c6758..8c3785e 100644 --- a/daemon.go +++ b/daemon.go @@ -133,57 +133,64 @@ func (d *daemon) runCidCheck(cidStr string) (*[]providerOutput, error) { var wg sync.WaitGroup var mu sync.Mutex - for provider := range provsCh { - wg.Add(1) - go func(provider peer.AddrInfo) { - defer wg.Done() - - addrs := make([]string, len(provider.Addrs)) - for i, addr := range provider.Addrs { - addrs[i] = addr.String() + for { + select { + case provider, ok := <-provsCh: + if !ok { + // Channel closed, all providers processed + return &out, nil } + wg.Add(1) + go func(provider peer.AddrInfo) { + defer wg.Done() - provOutput := providerOutput{ - ID: provider.ID.String(), - Addrs: addrs, - BitswapCheckOutput: BitswapCheckOutput{}, - } + addrs := make([]string, len(provider.Addrs)) + for i, addr := range provider.Addrs { + addrs[i] = addr.String() + } - testHost, err := d.createTestHost() - if err != nil { - log.Printf("Error creating test host: %v", err) - return - } - defer testHost.Close() + provOutput := providerOutput{ + ID: provider.ID.String(), + Addrs: addrs, + BitswapCheckOutput: BitswapCheckOutput{}, + } - // Test Is the target connectable - dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15) - defer dialCancel() + testHost, err := d.createTestHost() + if err != nil { + log.Printf("Error creating test host: %v", err) + return + } + defer testHost.Close() - // we call NewStream to force NAT hole punching - // See https://github.com/libp2p/go-libp2p/issues/2714 - testHost.Connect(dialCtx, provider) - _, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap") + // Test Is the target connectable + dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15) + defer dialCancel() - if connErr != nil { - provOutput.BitswapCheckOutput.Error = fmt.Sprintf("error dialing to peer: %s", connErr.Error()) - } else { - // TODO: Modify checkBitswapCID and vole to accept `AddrInfo` so that it can test any of the connections - provOutput.BitswapCheckOutput = checkBitswapCID(ctx, testHost, cid, provider.Addrs[0]) + // we call NewStream to force NAT hole punching + // See https://github.com/libp2p/go-libp2p/issues/2714 + testHost.Connect(dialCtx, provider) + _, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap") - for _, c := range testHost.Network().ConnsToPeer(provider.ID) { - provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String()) + if connErr != nil { + provOutput.BitswapCheckOutput.Error = fmt.Sprintf("error dialing to peer: %s", connErr.Error()) + } else { + // TODO: Modify checkBitswapCID and vole to accept `AddrInfo` so that it can test any of the connections + provOutput.BitswapCheckOutput = checkBitswapCID(ctx, testHost, cid, provider.Addrs[0]) + + for _, c := range testHost.Network().ConnsToPeer(provider.ID) { + provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String()) + } } - } - mu.Lock() - out = append(out, provOutput) - mu.Unlock() - }(provider) + mu.Lock() + out = append(out, provOutput) + mu.Unlock() + }(provider) + case <-ctx.Done(): + // Context cancelled + return &out, ctx.Err() + } } - - wg.Wait() - return &out, nil } type peerCheckOutput struct { From 23bef225a5cc067e454e16df4b025464a9f5c365 Mon Sep 17 00:00:00 2001 From: Daniel N <2color@users.noreply.github.com> Date: Tue, 27 Aug 2024 16:40:11 +0200 Subject: [PATCH 04/13] feat: run cid check and add test --- daemon.go | 97 +++++++++++++++++++++------------------------ integration_test.go | 28 +++++++++++++ main.go | 2 +- test/tools.go | 20 ++++++++++ web/index.html | 9 ++++- 5 files changed, 103 insertions(+), 53 deletions(-) diff --git a/daemon.go b/daemon.go index 8c3785e..7c9470c 100644 --- a/daemon.go +++ b/daemon.go @@ -38,7 +38,7 @@ type daemon struct { // number of providers at which to stop looking for providers in the DHT // When doing a check only with a CID -var MaxProvidersCount = 3 +var MaxProvidersCount = 10 func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) { rm, err := NewResourceManager() @@ -112,18 +112,18 @@ func (d *daemon) mustStart() { type providerOutput struct { ID string + ConnectionError string Addrs []string ConnectionMaddrs []string BitswapCheckOutput BitswapCheckOutput } -func (d *daemon) runCidCheck(cidStr string) (*[]providerOutput, error) { +func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (*[]providerOutput, error) { cid, err := cid.Decode(cidStr) if err != nil { return nil, err } - ctx := context.Background() out := make([]providerOutput, 0, MaxProvidersCount) queryCtx, cancel := context.WithCancel(ctx) @@ -133,64 +133,59 @@ func (d *daemon) runCidCheck(cidStr string) (*[]providerOutput, error) { var wg sync.WaitGroup var mu sync.Mutex - for { - select { - case provider, ok := <-provsCh: - if !ok { - // Channel closed, all providers processed - return &out, nil + for provider := range provsCh { + wg.Add(1) + go func(provider peer.AddrInfo) { + defer wg.Done() + addrs := make([]string, len(provider.Addrs)) + for i, addr := range provider.Addrs { + addrs[i] = addr.String() } - wg.Add(1) - go func(provider peer.AddrInfo) { - defer wg.Done() - addrs := make([]string, len(provider.Addrs)) - for i, addr := range provider.Addrs { - addrs[i] = addr.String() - } - - provOutput := providerOutput{ - ID: provider.ID.String(), - Addrs: addrs, - BitswapCheckOutput: BitswapCheckOutput{}, - } + provOutput := providerOutput{ + ID: provider.ID.String(), + Addrs: addrs, + BitswapCheckOutput: BitswapCheckOutput{}, + } + log.Printf("provider output: %v\n", provOutput) - testHost, err := d.createTestHost() - if err != nil { - log.Printf("Error creating test host: %v", err) - return - } - defer testHost.Close() + testHost, err := d.createTestHost() + if err != nil { + log.Printf("Error creating test host: %v\n", err) + return + } + defer testHost.Close() - // Test Is the target connectable - dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15) - defer dialCancel() + // Test Is the target connectable + dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15) + defer dialCancel() - // we call NewStream to force NAT hole punching - // See https://github.com/libp2p/go-libp2p/issues/2714 - testHost.Connect(dialCtx, provider) - _, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap") + // Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714 + testHost.Connect(dialCtx, provider) + _, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap") - if connErr != nil { - provOutput.BitswapCheckOutput.Error = fmt.Sprintf("error dialing to peer: %s", connErr.Error()) - } else { - // TODO: Modify checkBitswapCID and vole to accept `AddrInfo` so that it can test any of the connections - provOutput.BitswapCheckOutput = checkBitswapCID(ctx, testHost, cid, provider.Addrs[0]) + if connErr != nil { + provOutput.ConnectionError = fmt.Sprintf("error dialing to peer: %s", connErr.Error()) + } else { + // since we pass a libp2p host that's already connected to the peer the actual connection maddr we pass in doesn't matter + p2pAddr, _ := multiaddr.NewMultiaddr("/p2p/" + provider.ID.String()) + provOutput.BitswapCheckOutput = checkBitswapCID(ctx, testHost, cid, p2pAddr) - for _, c := range testHost.Network().ConnsToPeer(provider.ID) { - provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String()) - } + for _, c := range testHost.Network().ConnsToPeer(provider.ID) { + provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String()) } + } - mu.Lock() - out = append(out, provOutput) - mu.Unlock() - }(provider) - case <-ctx.Done(): - // Context cancelled - return &out, ctx.Err() - } + mu.Lock() + out = append(out, provOutput) + mu.Unlock() + }(provider) } + + // Wait for all goroutines to finish + wg.Wait() + + return &out, nil } type peerCheckOutput struct { diff --git a/integration_test.go b/integration_test.go index 6dd2da0..ae93617 100644 --- a/integration_test.go +++ b/integration_test.go @@ -157,4 +157,32 @@ func TestBasicIntegration(t *testing.T) { obj.Value("DataAvailableOverBitswap").Object().Value("Found").Boolean().IsFalse() obj.Value("DataAvailableOverBitswap").Object().Value("Responded").Boolean().IsTrue() }) + + t.Run("Data found on reachable peer with just cid", func(t *testing.T) { + testData := []byte(t.Name()) + mh, err := multihash.Sum(testData, multihash.SHA2_256, -1) + require.NoError(t, err) + testCid := cid.NewCidV1(cid.Raw, mh) + testBlock, err := blocks.NewBlockWithCid(testData, testCid) + require.NoError(t, err) + err = bstore.Put(ctx, testBlock) + require.NoError(t, err) + err = dhtClient.Provide(ctx, testCid, true) + require.NoError(t, err) + + res := test.Query(t, "http://localhost:1234", testCid.String()) + + res.Length().IsEqual(1) + res.Value(0).Object().Value("ID").String().IsEqual(h.ID().String()) + res.Value(0).Object().Value("ConnectionError").String().IsEmpty() + testHostAddrs := h.Addrs() + for _, addr := range testHostAddrs { + res.Value(0).Object().Value("Addrs").Array().ContainsAny(addr.String()) + } + + res.Value(0).Object().Value("ConnectionMaddrs").Array() + res.Value(0).Object().Value("BitswapCheckOutput").Object().Value("Error").String().IsEmpty() + res.Value(0).Object().Value("BitswapCheckOutput").Object().Value("Found").Boolean().IsTrue() + res.Value(0).Object().Value("BitswapCheckOutput").Object().Value("Responded").Boolean().IsTrue() + }) } diff --git a/main.go b/main.go index 1f85045..fa0761f 100644 --- a/main.go +++ b/main.go @@ -90,7 +90,7 @@ func startServer(ctx context.Context, d *daemon, tcpListener, metricsUsername, m var data interface{} if maStr == "" { - data, err = d.runCidCheck(cidStr) + data, err = d.runCidCheck(r.Context(), cidStr) } else { data, err = d.runPeerCheck(maStr, cidStr) } diff --git a/test/tools.go b/test/tools.go index 39cc275..8ab1e9b 100644 --- a/test/tools.go +++ b/test/tools.go @@ -51,6 +51,26 @@ func Query( JSON(opts).Object() } +func QueryCid( + t *testing.T, + url string, + cid string, +) *httpexpect.Array { + expectedContentType := "application/json" + + opts := httpexpect.ContentOpts{ + MediaType: expectedContentType, + } + + e := httpexpect.Default(t, url) + + return e.GET("/check"). + WithQuery("cid", cid). + Expect(). + Status(http.StatusOK). + JSON(opts).Array() +} + func GetEnv(key string, fallback string) string { if value, ok := os.LookupEnv(key); ok { return value diff --git a/web/index.html b/web/index.html index e7621e4..1bdcfeb 100644 --- a/web/index.html +++ b/web/index.html @@ -28,7 +28,7 @@
+