Skip to content

Commit

Permalink
Handle NoEntries cid gracefully and improve cli output
Browse files Browse the repository at this point in the history
When encountering `NoEntries` CID, do not attempt to traverse it in
`provider` CLI.

Do not traverse entries of a removal advertisement even if a CID is
present. Instead, print useufl messages to signal such cases.

Improve output for human readability.

Fixes #179
  • Loading branch information
masih committed Feb 18, 2022
1 parent 9dbb62d commit b8cd570
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 60 deletions.
64 changes: 39 additions & 25 deletions cmd/provider/internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/filecoin-project/go-legs/dtsync"
"github.com/filecoin-project/go-legs/httpsync"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal/selector"
selectorbuilder "github.com/ipld/go-ipld-prime/traversal/selector/builder"
Expand All @@ -26,7 +26,9 @@ type (
close func() error
syncer legs.Syncer
store *ProviderClientStore
sel datamodel.Node

adSel ipld.Node
entSel ipld.Node
}
)

Expand All @@ -47,7 +49,13 @@ func NewHttpProviderClient(provAddr peer.AddrInfo) (ProviderClient, error) {
}, nil
}

func NewGraphSyncProviderClient(provAddr peer.AddrInfo, topic string, entryRecursionLimit int64) (ProviderClient, error) {
func NewGraphSyncProviderClient(provAddr peer.AddrInfo, o ...Option) (ProviderClient, error) {

opts, err := newOptions(o...)
if err != nil {
return nil, err
}

h, err := libp2p.New()
if err != nil {
return nil, err
Expand All @@ -59,33 +67,25 @@ func NewGraphSyncProviderClient(provAddr peer.AddrInfo, topic string, entryRecur
if err != nil {
return nil, err
}
syncer := dtSync.NewSyncer(provAddr.ID, topic)

var erl selector.RecursionLimit
if entryRecursionLimit <= 0 {
erl = selector.RecursionLimitNone()
} else {
erl = selector.RecursionLimitDepth(entryRecursionLimit)
}
syncer := dtSync.NewSyncer(provAddr.ID, opts.topic)

ssb := selectorbuilder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
oneAdWithEntries := ssb.ExploreUnion(
ssb.ExploreRecursive(selector.RecursionLimitDepth(1), ssb.ExploreFields(
func(efsb selectorbuilder.ExploreFieldsSpecBuilder) {
efsb.Insert("PreviousID", ssb.ExploreRecursiveEdge())
})),
ssb.ExploreRecursive(erl, ssb.ExploreFields(
func(efsb selectorbuilder.ExploreFieldsSpecBuilder) {
efsb.Insert("Next", ssb.ExploreRecursiveEdge())
efsb.Insert("Entries", ssb.ExploreRecursiveEdge())
})),
).Node()
adSel := ssb.ExploreRecursive(selector.RecursionLimitDepth(1), ssb.ExploreFields(
func(efsb selectorbuilder.ExploreFieldsSpecBuilder) {
efsb.Insert("PreviousID", ssb.ExploreRecursiveEdge())
})).Node()

entSel := ssb.ExploreRecursive(opts.entriesRecurLimit, ssb.ExploreFields(
func(efsb selectorbuilder.ExploreFieldsSpecBuilder) {
efsb.Insert("Next", ssb.ExploreRecursiveEdge())
})).Node()

return &providerGraphSyncClient{
close: dtSync.Close,
syncer: syncer,
store: store,
sel: oneAdWithEntries,
adSel: adSel,
entSel: entSel,
}, nil
}

Expand All @@ -102,10 +102,24 @@ func (p *providerGraphSyncClient) GetAdvertisement(ctx context.Context, id cid.C
id = head
}

if err := p.syncer.Sync(ctx, id, p.sel); err != nil {
// Sync the advertisement without entries first.
if err := p.syncer.Sync(ctx, id, p.adSel); err != nil {
return nil, err
}

// Load the synced advertisement from local store.
ad, err := p.store.getAdvertisement(ctx, id)
if err != nil {
return nil, err
}
return p.store.getAdvertisement(ctx, id)

// Only sync its entries recursively if it is not a removal advertisement and has entries.
if !ad.IsRemove && ad.HasEntries() {
err = p.syncer.Sync(ctx, ad.Entries.root, p.entSel)
}

// Return the partially synced advertisement useful for output to client.
return ad, err
}

func (p *providerGraphSyncClient) Close() error {
Expand Down
4 changes: 4 additions & 0 deletions cmd/provider/internal/client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type (
}
)

func (a *Advertisement) HasEntries() bool {
return a.Entries.IsPresent()
}

func newProviderClientStore() *ProviderClientStore {
store := dssync.MutexWrap(datastore.NewMapDatastore())
lsys := cidlink.DefaultLinkSystem()
Expand Down
23 changes: 21 additions & 2 deletions cmd/provider/internal/entries_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package internal

import (
"context"
"io"

provider "github.com/filecoin-project/index-provider"
"github.com/filecoin-project/storetheindex/api/v0/ingest/schema"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"io"
)

var (
Expand All @@ -28,12 +30,29 @@ type (
}
)

func (d *EntriesIterator) Root() cid.Cid {
return d.root
}

func (d *EntriesIterator) IsPresent() bool {
return isPresent(d.root)
}

func isPresent(c cid.Cid) bool {
return c != schema.NoEntries.Cid && c != cid.Undef
}

func (d *EntriesIterator) Next() (multihash.Multihash, error) {

if !d.IsPresent() {
return nil, io.EOF
}

if d.chunkIter != nil && d.chunkIter.hasNext() {
return d.chunkIter.Next()
}

if d.next == cid.Undef {
if isPresent(d.next) {
return nil, io.EOF
}

Expand Down
39 changes: 39 additions & 0 deletions cmd/provider/internal/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package internal

import "github.com/ipld/go-ipld-prime/traversal/selector"

type (
Option func(*options) error

options struct {
entriesRecurLimit selector.RecursionLimit
topic string
}
)

func newOptions(o ...Option) (*options, error) {
opts := &options{
entriesRecurLimit: selector.RecursionLimitNone(),
topic: "/indexer/ingest/mainnet",
}
for _, apply := range o {
if err := apply(opts); err != nil {
return nil, err
}
}
return opts, nil
}

func WithTopic(topic string) Option {
return func(o *options) error {
o.topic = topic
return nil
}
}

func WithEntriesRecursionLimit(limit selector.RecursionLimit) Option {
return func(o *options) error {
o.entriesRecurLimit = limit
return nil
}
}
28 changes: 25 additions & 3 deletions cmd/provider/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package main
import (
"errors"
"fmt"
"os"

"github.com/filecoin-project/index-provider/cmd/provider/internal"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -93,13 +95,24 @@ func toProviderClient(addrStr string, topic string) (internal.ProviderClient, er
if adEntriesRecurLimitFlagValue < 0 {
return nil, fmt.Errorf("ad entries recursion depth limit cannot be less than zero; got %d", adEntriesRecurLimitFlagValue)
}
return internal.NewGraphSyncProviderClient(addrInfo, topic, adEntriesRecurLimitFlagValue)

var entRecurLim selector.RecursionLimit
if adEntriesRecurLimitFlagValue == 0 {
entRecurLim = selector.RecursionLimitNone()
} else {
entRecurLim = selector.RecursionLimitDepth(adEntriesRecurLimitFlagValue)
}

return internal.NewGraphSyncProviderClient(addrInfo, internal.WithTopic(topic), internal.WithEntriesRecursionLimit(entRecurLim))
}

func doGetAdvertisements(cctx *cli.Context) error {
ad, err := provClient.GetAdvertisement(cctx.Context, adCid)
if err != nil {
return err
if ad == nil {
return err
}
fmt.Fprintf(os.Stderr, "⚠️ Failed to fully sync advertisement %s. Output shows partially synced ad.\n Error: %s\n", adCid, err.Error())
}

fmt.Printf("ID: %s\n", ad.ID)
Expand All @@ -108,11 +121,20 @@ func doGetAdvertisements(cctx *cli.Context) error {
fmt.Printf("Addresses: %v\n", ad.Addresses)
fmt.Printf("Is Remove: %v\n", ad.IsRemove)

if ad.IsRemove {
if ad.HasEntries() {
fmt.Println("Entries: sync skipped")
fmt.Printf(" ⚠️ Removal advertisement with non-empty entries root cid: %s\n", ad.Entries.Root())
} else {
fmt.Println("Entries: None")
}
return nil
}
fmt.Println("Entries:")
var entriesOutput string
entries, err := ad.Entries.Drain()
if err == datastore.ErrNotFound {
entriesOutput = "Note: More entries are available but not synced due to the configured entries recursion limit."
entriesOutput = "⚠️ Note: More entries were available but not synced due to the configured entries recursion limit or error during traversal."
} else if err != nil {
return err
}
Expand Down
81 changes: 51 additions & 30 deletions cmd/provider/verify_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,46 +290,62 @@ func doVerifyIngestFromProvider(cctx *cli.Context) error {
adRecurLimitStr = fmt.Sprintf("%d", adRecurLimit)
}
var aggResult verifyIngestResult
rmCtxIDs := make(map[string]interface{})
for i := 1; i <= adRecurLimit; i++ {
ad, err := provClient.GetAdvertisement(cctx.Context, adCid)
if err != nil {
return err
}

if ad.IsRemove {
// TODO implement verificiation when ad is about removal
// When implementing note that entries may be empty and if so we need to walk back the chain to get the list of entries.
return fmt.Errorf("ad %s is a removal advertisement; verifying ingest from removal advertisement is not yet supported", adCid)
}

var entriesOutput string
allMhs, err := ad.Entries.Drain()
if err == datastore.ErrNotFound {
entriesOutput = "; skipped syncing the remaining chunks due to recursion limit"
} else if err != nil {
return err
}

var mhs []multihash.Multihash
for _, mh := range allMhs {
if include() {
mhs = append(mhs, mh)
if ad == nil {
return err
}
fmt.Fprintf(os.Stderr, "⚠️ Failed to fully sync advertisement %s. Output shows partially synced ad.\n Error: %s\n", adCid, err.Error())
}

fmt.Printf("Advertisement ID: %s\n", ad.ID)
fmt.Printf("Previous Advertisement ID: %s\n", ad.PreviousID)
fmt.Printf("Total Entries: %d over %d chunk(s)%s\n", len(allMhs), ad.Entries.ChunkCount(), entriesOutput)
fmt.Printf("Verifying ingest... (%d/%s)\n", i, adRecurLimitStr)
finder, err := httpfinderclient.New(indexerAddr)
if err != nil {
return err
}
result, err := verifyIngestFromMhs(finder, mhs)
if err != nil {
return err
ctxID := string(ad.ContextID)
if _, removed := rmCtxIDs[ctxID]; removed {
fmt.Println("🧹 Removed in later advertisements; skipping verification.")
} else if ad.IsRemove {
rmCtxIDs[ctxID] = nil
fmt.Println("✂️ Removal advertisement; skipping verification.")
} else if !ad.HasEntries() {
fmt.Println("Has no entries; skipping verification.")

} else {
var entriesOutput string
allMhs, err := ad.Entries.Drain()
if err == datastore.ErrNotFound {
entriesOutput = "; skipped syncing the remaining chunks due to recursion limit or error while syncing."
} else if err != nil {
return err
}

var mhs []multihash.Multihash
for _, mh := range allMhs {
if include() {
mhs = append(mhs, mh)
}
}

fmt.Printf("Total Entries: %d over %d chunk(s)%s\n", len(allMhs), ad.Entries.ChunkCount(), entriesOutput)
finder, err := httpfinderclient.New(indexerAddr)
if err != nil {
return err
}
result, err := verifyIngestFromMhs(finder, mhs)
if err != nil {
return err
}
fmt.Print("Verification: ")
if result.passedVerification() {
fmt.Println("✅ Pass")
} else {
fmt.Println("❌ Fail")
}
aggResult.add(result)
}
aggResult.add(result)
fmt.Println("-----------------------")

// Stop verification if there is no link to previous advertisement.
if ad.PreviousID == cid.Undef {
Expand Down Expand Up @@ -468,6 +484,11 @@ func (r *verifyIngestResult) printAndExit() error {
fmt.Printf("sampling probability: %.2f\n", samplingProb)
fmt.Printf("RNG seed: %d\n", rngSeed)
fmt.Println()

if r.total == 0 {
return cli.Exit("⚠️ Inconclusive; no multihashes were verified.", 0)
}

if r.passedVerification() {
return cli.Exit("🎉 Passed verification check.", 0)
}
Expand Down

0 comments on commit b8cd570

Please sign in to comment.