From b8cd570cc0a6d903878b970dc5c8057270811b1f Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Fri, 18 Feb 2022 19:07:02 +0000 Subject: [PATCH] Handle `NoEntries` cid gracefully and improve cli output When encountering `NoEntries` CID, do not attempt to traverse it in `provider` CLI. Do not traverse entries of a removal advertisement even if a CID is present. Instead, print useufl messages to signal such cases. Improve output for human readability. Fixes #179 --- cmd/provider/internal/client.go | 64 ++++++++++++--------- cmd/provider/internal/client_store.go | 4 ++ cmd/provider/internal/entries_iter.go | 23 +++++++- cmd/provider/internal/options.go | 39 +++++++++++++ cmd/provider/list.go | 28 ++++++++- cmd/provider/verify_ingest.go | 81 +++++++++++++++++---------- 6 files changed, 179 insertions(+), 60 deletions(-) create mode 100644 cmd/provider/internal/options.go diff --git a/cmd/provider/internal/client.go b/cmd/provider/internal/client.go index 4afe550e..a27d5cba 100644 --- a/cmd/provider/internal/client.go +++ b/cmd/provider/internal/client.go @@ -9,7 +9,7 @@ import ( "github.com/filecoin-project/go-legs/dtsync" "github.com/filecoin-project/go-legs/httpsync" "github.com/ipfs/go-cid" - "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/node/basicnode" "github.com/ipld/go-ipld-prime/traversal/selector" selectorbuilder "github.com/ipld/go-ipld-prime/traversal/selector/builder" @@ -26,7 +26,9 @@ type ( close func() error syncer legs.Syncer store *ProviderClientStore - sel datamodel.Node + + adSel ipld.Node + entSel ipld.Node } ) @@ -47,7 +49,13 @@ func NewHttpProviderClient(provAddr peer.AddrInfo) (ProviderClient, error) { }, nil } -func NewGraphSyncProviderClient(provAddr peer.AddrInfo, topic string, entryRecursionLimit int64) (ProviderClient, error) { +func NewGraphSyncProviderClient(provAddr peer.AddrInfo, o ...Option) (ProviderClient, error) { + + opts, err := newOptions(o...) + if err != nil { + return nil, err + } + h, err := libp2p.New() if err != nil { return nil, err @@ -59,33 +67,25 @@ func NewGraphSyncProviderClient(provAddr peer.AddrInfo, topic string, entryRecur if err != nil { return nil, err } - syncer := dtSync.NewSyncer(provAddr.ID, topic) - - var erl selector.RecursionLimit - if entryRecursionLimit <= 0 { - erl = selector.RecursionLimitNone() - } else { - erl = selector.RecursionLimitDepth(entryRecursionLimit) - } + syncer := dtSync.NewSyncer(provAddr.ID, opts.topic) ssb := selectorbuilder.NewSelectorSpecBuilder(basicnode.Prototype.Any) - oneAdWithEntries := ssb.ExploreUnion( - ssb.ExploreRecursive(selector.RecursionLimitDepth(1), ssb.ExploreFields( - func(efsb selectorbuilder.ExploreFieldsSpecBuilder) { - efsb.Insert("PreviousID", ssb.ExploreRecursiveEdge()) - })), - ssb.ExploreRecursive(erl, ssb.ExploreFields( - func(efsb selectorbuilder.ExploreFieldsSpecBuilder) { - efsb.Insert("Next", ssb.ExploreRecursiveEdge()) - efsb.Insert("Entries", ssb.ExploreRecursiveEdge()) - })), - ).Node() + adSel := ssb.ExploreRecursive(selector.RecursionLimitDepth(1), ssb.ExploreFields( + func(efsb selectorbuilder.ExploreFieldsSpecBuilder) { + efsb.Insert("PreviousID", ssb.ExploreRecursiveEdge()) + })).Node() + + entSel := ssb.ExploreRecursive(opts.entriesRecurLimit, ssb.ExploreFields( + func(efsb selectorbuilder.ExploreFieldsSpecBuilder) { + efsb.Insert("Next", ssb.ExploreRecursiveEdge()) + })).Node() return &providerGraphSyncClient{ close: dtSync.Close, syncer: syncer, store: store, - sel: oneAdWithEntries, + adSel: adSel, + entSel: entSel, }, nil } @@ -102,10 +102,24 @@ func (p *providerGraphSyncClient) GetAdvertisement(ctx context.Context, id cid.C id = head } - if err := p.syncer.Sync(ctx, id, p.sel); err != nil { + // Sync the advertisement without entries first. + if err := p.syncer.Sync(ctx, id, p.adSel); err != nil { + return nil, err + } + + // Load the synced advertisement from local store. + ad, err := p.store.getAdvertisement(ctx, id) + if err != nil { return nil, err } - return p.store.getAdvertisement(ctx, id) + + // Only sync its entries recursively if it is not a removal advertisement and has entries. + if !ad.IsRemove && ad.HasEntries() { + err = p.syncer.Sync(ctx, ad.Entries.root, p.entSel) + } + + // Return the partially synced advertisement useful for output to client. + return ad, err } func (p *providerGraphSyncClient) Close() error { diff --git a/cmd/provider/internal/client_store.go b/cmd/provider/internal/client_store.go index 12f56f4c..d2e660be 100644 --- a/cmd/provider/internal/client_store.go +++ b/cmd/provider/internal/client_store.go @@ -40,6 +40,10 @@ type ( } ) +func (a *Advertisement) HasEntries() bool { + return a.Entries.IsPresent() +} + func newProviderClientStore() *ProviderClientStore { store := dssync.MutexWrap(datastore.NewMapDatastore()) lsys := cidlink.DefaultLinkSystem() diff --git a/cmd/provider/internal/entries_iter.go b/cmd/provider/internal/entries_iter.go index bfc87a75..df2a0289 100644 --- a/cmd/provider/internal/entries_iter.go +++ b/cmd/provider/internal/entries_iter.go @@ -2,10 +2,12 @@ package internal import ( "context" + "io" + provider "github.com/filecoin-project/index-provider" + "github.com/filecoin-project/storetheindex/api/v0/ingest/schema" "github.com/ipfs/go-cid" "github.com/multiformats/go-multihash" - "io" ) var ( @@ -28,12 +30,29 @@ type ( } ) +func (d *EntriesIterator) Root() cid.Cid { + return d.root +} + +func (d *EntriesIterator) IsPresent() bool { + return isPresent(d.root) +} + +func isPresent(c cid.Cid) bool { + return c != schema.NoEntries.Cid && c != cid.Undef +} + func (d *EntriesIterator) Next() (multihash.Multihash, error) { + + if !d.IsPresent() { + return nil, io.EOF + } + if d.chunkIter != nil && d.chunkIter.hasNext() { return d.chunkIter.Next() } - if d.next == cid.Undef { + if isPresent(d.next) { return nil, io.EOF } diff --git a/cmd/provider/internal/options.go b/cmd/provider/internal/options.go new file mode 100644 index 00000000..6385e3f1 --- /dev/null +++ b/cmd/provider/internal/options.go @@ -0,0 +1,39 @@ +package internal + +import "github.com/ipld/go-ipld-prime/traversal/selector" + +type ( + Option func(*options) error + + options struct { + entriesRecurLimit selector.RecursionLimit + topic string + } +) + +func newOptions(o ...Option) (*options, error) { + opts := &options{ + entriesRecurLimit: selector.RecursionLimitNone(), + topic: "/indexer/ingest/mainnet", + } + for _, apply := range o { + if err := apply(opts); err != nil { + return nil, err + } + } + return opts, nil +} + +func WithTopic(topic string) Option { + return func(o *options) error { + o.topic = topic + return nil + } +} + +func WithEntriesRecursionLimit(limit selector.RecursionLimit) Option { + return func(o *options) error { + o.entriesRecurLimit = limit + return nil + } +} diff --git a/cmd/provider/list.go b/cmd/provider/list.go index cb08afde..b8e26898 100644 --- a/cmd/provider/list.go +++ b/cmd/provider/list.go @@ -3,10 +3,12 @@ package main import ( "errors" "fmt" + "os" "github.com/filecoin-project/index-provider/cmd/provider/internal" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multiaddr" "github.com/urfave/cli/v2" @@ -93,13 +95,24 @@ func toProviderClient(addrStr string, topic string) (internal.ProviderClient, er if adEntriesRecurLimitFlagValue < 0 { return nil, fmt.Errorf("ad entries recursion depth limit cannot be less than zero; got %d", adEntriesRecurLimitFlagValue) } - return internal.NewGraphSyncProviderClient(addrInfo, topic, adEntriesRecurLimitFlagValue) + + var entRecurLim selector.RecursionLimit + if adEntriesRecurLimitFlagValue == 0 { + entRecurLim = selector.RecursionLimitNone() + } else { + entRecurLim = selector.RecursionLimitDepth(adEntriesRecurLimitFlagValue) + } + + return internal.NewGraphSyncProviderClient(addrInfo, internal.WithTopic(topic), internal.WithEntriesRecursionLimit(entRecurLim)) } func doGetAdvertisements(cctx *cli.Context) error { ad, err := provClient.GetAdvertisement(cctx.Context, adCid) if err != nil { - return err + if ad == nil { + return err + } + fmt.Fprintf(os.Stderr, "⚠️ Failed to fully sync advertisement %s. Output shows partially synced ad.\n Error: %s\n", adCid, err.Error()) } fmt.Printf("ID: %s\n", ad.ID) @@ -108,11 +121,20 @@ func doGetAdvertisements(cctx *cli.Context) error { fmt.Printf("Addresses: %v\n", ad.Addresses) fmt.Printf("Is Remove: %v\n", ad.IsRemove) + if ad.IsRemove { + if ad.HasEntries() { + fmt.Println("Entries: sync skipped") + fmt.Printf(" ⚠️ Removal advertisement with non-empty entries root cid: %s\n", ad.Entries.Root()) + } else { + fmt.Println("Entries: None") + } + return nil + } fmt.Println("Entries:") var entriesOutput string entries, err := ad.Entries.Drain() if err == datastore.ErrNotFound { - entriesOutput = "Note: More entries are available but not synced due to the configured entries recursion limit." + entriesOutput = "⚠️ Note: More entries were available but not synced due to the configured entries recursion limit or error during traversal." } else if err != nil { return err } diff --git a/cmd/provider/verify_ingest.go b/cmd/provider/verify_ingest.go index 9b69c703..1728b5c6 100644 --- a/cmd/provider/verify_ingest.go +++ b/cmd/provider/verify_ingest.go @@ -290,46 +290,62 @@ func doVerifyIngestFromProvider(cctx *cli.Context) error { adRecurLimitStr = fmt.Sprintf("%d", adRecurLimit) } var aggResult verifyIngestResult + rmCtxIDs := make(map[string]interface{}) for i := 1; i <= adRecurLimit; i++ { ad, err := provClient.GetAdvertisement(cctx.Context, adCid) if err != nil { - return err - } - - if ad.IsRemove { - // TODO implement verificiation when ad is about removal - // When implementing note that entries may be empty and if so we need to walk back the chain to get the list of entries. - return fmt.Errorf("ad %s is a removal advertisement; verifying ingest from removal advertisement is not yet supported", adCid) - } - - var entriesOutput string - allMhs, err := ad.Entries.Drain() - if err == datastore.ErrNotFound { - entriesOutput = "; skipped syncing the remaining chunks due to recursion limit" - } else if err != nil { - return err - } - - var mhs []multihash.Multihash - for _, mh := range allMhs { - if include() { - mhs = append(mhs, mh) + if ad == nil { + return err } + fmt.Fprintf(os.Stderr, "⚠️ Failed to fully sync advertisement %s. Output shows partially synced ad.\n Error: %s\n", adCid, err.Error()) } fmt.Printf("Advertisement ID: %s\n", ad.ID) fmt.Printf("Previous Advertisement ID: %s\n", ad.PreviousID) - fmt.Printf("Total Entries: %d over %d chunk(s)%s\n", len(allMhs), ad.Entries.ChunkCount(), entriesOutput) fmt.Printf("Verifying ingest... (%d/%s)\n", i, adRecurLimitStr) - finder, err := httpfinderclient.New(indexerAddr) - if err != nil { - return err - } - result, err := verifyIngestFromMhs(finder, mhs) - if err != nil { - return err + ctxID := string(ad.ContextID) + if _, removed := rmCtxIDs[ctxID]; removed { + fmt.Println("🧹 Removed in later advertisements; skipping verification.") + } else if ad.IsRemove { + rmCtxIDs[ctxID] = nil + fmt.Println("✂️ Removal advertisement; skipping verification.") + } else if !ad.HasEntries() { + fmt.Println("Has no entries; skipping verification.") + + } else { + var entriesOutput string + allMhs, err := ad.Entries.Drain() + if err == datastore.ErrNotFound { + entriesOutput = "; skipped syncing the remaining chunks due to recursion limit or error while syncing." + } else if err != nil { + return err + } + + var mhs []multihash.Multihash + for _, mh := range allMhs { + if include() { + mhs = append(mhs, mh) + } + } + + fmt.Printf("Total Entries: %d over %d chunk(s)%s\n", len(allMhs), ad.Entries.ChunkCount(), entriesOutput) + finder, err := httpfinderclient.New(indexerAddr) + if err != nil { + return err + } + result, err := verifyIngestFromMhs(finder, mhs) + if err != nil { + return err + } + fmt.Print("Verification: ") + if result.passedVerification() { + fmt.Println("✅ Pass") + } else { + fmt.Println("❌ Fail") + } + aggResult.add(result) } - aggResult.add(result) + fmt.Println("-----------------------") // Stop verification if there is no link to previous advertisement. if ad.PreviousID == cid.Undef { @@ -468,6 +484,11 @@ func (r *verifyIngestResult) printAndExit() error { fmt.Printf("sampling probability: %.2f\n", samplingProb) fmt.Printf("RNG seed: %d\n", rngSeed) fmt.Println() + + if r.total == 0 { + return cli.Exit("⚠️ Inconclusive; no multihashes were verified.", 0) + } + if r.passedVerification() { return cli.Exit("🎉 Passed verification check.", 0) }