From 022a05ec277daf403af5ef40704861a7eb07b1ac Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Mon, 28 Feb 2022 13:37:13 +0000 Subject: [PATCH] Make network announcements optional and introduce engine options Implement the ability to disable network announcements of published advertisements. Change the behaviour of engine, so that announcements are disabled by default until `PublisherKind` is set. Refactor the engine package, and introduce options with defaults such that the barrier to instantiate an engine is low with sensible defaults. Future PRs will refactor the config package to match the options. --- cmd/provider/daemon.go | 9 +- e2e_retrieve_test.go | 101 +++++++-------- engine/engine.go | 184 ++++++++++----------------- engine/engine_test.go | 31 ++--- engine/example_test.go | 85 ++----------- engine/linksystem_test.go | 12 +- engine/option.go | 32 ----- engine/options.go | 258 ++++++++++++++++++++++++++++++++++++++ go.mod | 1 + 9 files changed, 411 insertions(+), 302 deletions(-) delete mode 100644 engine/option.go create mode 100644 engine/options.go diff --git a/cmd/provider/daemon.go b/cmd/provider/daemon.go index 4279f826..d87d43a2 100644 --- a/cmd/provider/daemon.go +++ b/cmd/provider/daemon.go @@ -116,7 +116,14 @@ func daemonCommand(cctx *cli.Context) error { } // Starting provider core - eng, err := engine.New(cfg.Ingest, privKey, dt, h, ds, cfg.ProviderServer.RetrievalMultiaddrs) + eng, err := engine.New( + engine.WithDatastore(ds), + engine.WithDataTransfer(dt), + engine.WithHost(h), + engine.WithEntriesCacheCapacity(cfg.Ingest.LinkCacheSize), + engine.WithEntriesChunkSize(cfg.Ingest.LinkedChunkSize), + engine.WithTopicName(cfg.Ingest.PubSubTopic), + engine.WithPublisherKind(engine.PublisherKind(cfg.Ingest.PublisherKind))) if err != nil { return err } diff --git a/e2e_retrieve_test.go b/e2e_retrieve_test.go index cdc1e21e..b9c63302 100644 --- a/e2e_retrieve_test.go +++ b/e2e_retrieve_test.go @@ -3,6 +3,7 @@ package provider_test import ( "context" "net" + "os" "path/filepath" "strings" "testing" @@ -12,7 +13,6 @@ import ( "github.com/filecoin-project/go-legs" provider "github.com/filecoin-project/index-provider" "github.com/filecoin-project/index-provider/cardatatransfer" - "github.com/filecoin-project/index-provider/config" "github.com/filecoin-project/index-provider/engine" "github.com/filecoin-project/index-provider/metadata" "github.com/filecoin-project/index-provider/supplier" @@ -29,9 +29,7 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/test" "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" ) @@ -40,26 +38,47 @@ const testTopic = "test/topic" type testCase struct { name string - serverConfigOpts []func(*config.Ingest) + serverConfigOpts func(*testing.T) []engine.Option } var testCases = []testCase{ { - name: "DT Publisher", - serverConfigOpts: nil, + name: "DT Publisher", + serverConfigOpts: func(t *testing.T) []engine.Option { + // Use env var to signal what publisher kind is being used. + setPubKindEnvVarKey(t, engine.DataTransferPublisher) + return []engine.Option{ + engine.WithTopicName(testTopic), + engine.WithPublisherKind(engine.DataTransferPublisher), + } + }, }, { name: "HTTP Publisher", - serverConfigOpts: []func(*config.Ingest){ - func(c *config.Ingest) { - httpPublisherCfg := config.NewHttpPublisher() - c.PublisherKind = config.HttpPublisherKind - c.HttpPublisher = httpPublisherCfg - }, + serverConfigOpts: func(t *testing.T) []engine.Option { + // Use env var to signal what publisher kind is being used. + setPubKindEnvVarKey(t, engine.HttpPublisher) + return []engine.Option{ + engine.WithTopicName(testTopic), + engine.WithPublisherKind(engine.HttpPublisher), + } }, }, } +// setPubKindEnvVarKey to signal to newTestServer, which publisher kind is being used so that +// the test server can be configured correctly. +func setPubKindEnvVarKey(t *testing.T, kind engine.PublisherKind) { + // Set env var via direct call to os instead of t.SetEnv, because CI runs tests on 1.16 and + // that function is only available after 1.17 + key := pubKindEnvVarKey(t) + err := os.Setenv(key, string(kind)) + require.NoError(t, err) + t.Cleanup(func() { + os.Unsetenv(key) + }) +} + func TestRetrievalRoundTrip(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -73,7 +92,7 @@ func testRetrievalRoundTripWithTestCase(t *testing.T, tc testCase) { defer cancel() // Initialize everything - server := newTestServer(t, ctx, tc.serverConfigOpts...) + server := newTestServer(t, ctx, tc.serverConfigOpts(t)...) client := newTestClient(t) disseminateNetworkState(server.h, client.h) @@ -163,7 +182,7 @@ func testReimportCarWtihTestCase(t *testing.T, tc testCase) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - server := newTestServer(t, ctx, tc.serverConfigOpts...) + server := newTestServer(t, ctx, tc.serverConfigOpts(t)...) client := newTestClient(t) disseminateNetworkState(server.h, client.h) @@ -257,47 +276,27 @@ type testServer struct { publisherAddr multiaddr.Multiaddr } -func newTestServer(t *testing.T, ctx context.Context, cfgOpts ...func(*config.Ingest)) *testServer { - priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) - require.NoError(t, err) - h, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"), libp2p.Identity(priv)) - require.NoError(t, err) - store := dssync.MutexWrap(datastore.NewMapDatastore()) - t.Cleanup(func() { - h.Close() - }) +func newTestServer(t *testing.T, ctx context.Context, o ...engine.Option) *testServer { + // Explicitly override host so that the host is known for testing purposes. + h := newHost(t) + store := dssync.MutexWrap(datastore.NewMapDatastore()) dt := testutil.SetupDataTransferOnHost(t, h, store, cidlink.DefaultLinkSystem()) - ingestCfg := config.Ingest{ - PubSubTopic: testTopic, - } - for _, f := range cfgOpts { - f(&ingestCfg) - } + o = append(o, engine.WithHost(h), engine.WithDatastore(store), engine.WithDataTransfer(dt)) var publisherAddr multiaddr.Multiaddr - if ingestCfg.PublisherKind == config.HttpPublisherKind { + pubKind := engine.PublisherKind(os.Getenv(pubKindEnvVarKey(t))) + if pubKind == engine.HttpPublisher { + var err error port := findOpenPort(t) - publisherAddr, err = multiaddr.NewMultiaddr(ingestCfg.HttpPublisher.ListenMultiaddr) + publisherAddr, err = multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/" + port + "/http") require.NoError(t, err) - - // Replace the default port with a port we know is open so that tests can - // run in parallel. - parts := multiaddr.Split(publisherAddr) - for i, p := range parts { - if p.Protocols()[0].Code == multiaddr.P_TCP { - parts[i], err = multiaddr.NewMultiaddr("/tcp/" + port) - require.NoError(t, err) - } - } - publisherAddr = multiaddr.Join(parts...) - ingestCfg.HttpPublisher.ListenMultiaddr = publisherAddr.String() - + o = append(o, engine.WithHttpPublisherListenAddr("0.0.0.0:"+port)) } else { publisherAddr = h.Addrs()[0] } - e, err := engine.New(ingestCfg, priv, dt, h, store, nil) + e, err := engine.New(o...) require.NoError(t, err) require.NoError(t, e.Start(ctx)) @@ -312,6 +311,10 @@ func newTestServer(t *testing.T, ctx context.Context, cfgOpts ...func(*config.In } } +func pubKindEnvVarKey(t *testing.T) string { + return t.Name() + "_publisher_kind" +} + type testClient struct { h host.Host dt datatransfer.Manager @@ -324,12 +327,7 @@ func newTestClient(t *testing.T) *testClient { blockStore := blockstore.NewBlockstore(store) lsys := storeutil.LinkSystemForBlockstore(blockStore) h := newHost(t) - t.Cleanup(func() { - h.Close() - }) - dt := testutil.SetupDataTransferOnHost(t, h, store, lsys) - return &testClient{ h: h, dt: dt, @@ -341,6 +339,9 @@ func newTestClient(t *testing.T) *testClient { func newHost(t *testing.T) host.Host { h, err := libp2p.New() require.NoError(t, err) + t.Cleanup(func() { + h.Close() + }) return h } diff --git a/engine/engine.go b/engine/engine.go index 597fc1b8..efcb7020 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -6,13 +6,11 @@ import ( "fmt" "sync" - dt "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-legs" "github.com/filecoin-project/go-legs/dtsync" "github.com/filecoin-project/go-legs/httpsync" provider "github.com/filecoin-project/index-provider" "github.com/filecoin-project/index-provider/cardatatransfer" - "github.com/filecoin-project/index-provider/config" "github.com/filecoin-project/index-provider/engine/chunker" stiapi "github.com/filecoin-project/storetheindex/api/v0" "github.com/filecoin-project/storetheindex/api/v0/ingest/schema" @@ -23,8 +21,6 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/host" ) const ( @@ -35,43 +31,20 @@ const ( linksCachePath = "/cache/links" ) -var dsLatestAdvKey = datastore.NewKey(latestAdvKey) -var log = logging.Logger("provider/engine") +var ( + log = logging.Logger("provider/engine") + + dsLatestAdvKey = datastore.NewKey(latestAdvKey) +) // Engine is an implementation of the core reference provider interface type Engine struct { - // privKey is the provider's privateKey - privKey crypto.PrivKey - // host is the libp2p host running the provider process - host host.Host - // dataTransfer is the data transfer module used by legs - dataTransfer dt.Manager - // addrs is a list of multiaddr strings which are the addresses advertised - // for content retrieval - addrs []string - // lsys is the main linksystem used for reference provider + *options lsys ipld.LinkSystem entriesChunker *chunker.CachedEntriesChunker - // ds is the datastore used for persistence of different assets (advertisements, - // indexed data, etc.) - ds datastore.Batching - - publisherKind config.PublisherKind - publisher legs.Publisher - extraGossipData []byte - - httpPublisherCfg *config.HttpPublisher - - // pubsubtopic where the provider will push advertisements - pubSubTopic string - linkedChunkSize int - - // purgeLinkCache indicates whether to purge the link cache on startup - purgeLinkCache bool - // linkCacheSize is the capacity of the link cache LRU - linkCacheSize int + publisher legs.Publisher // cb is the callback used in the linkSystem cb provider.Callback @@ -99,37 +72,14 @@ var _ provider.Interface = (*Engine)(nil) // // The engine must be started via Engine.Start before use and discarded via Engine.Shutdown when no longer needed. // See: Engine.Start, Engine.Shutdown. -func New(ingestCfg config.Ingest, privKey crypto.PrivKey, dt dt.Manager, h host.Host, ds datastore.Batching, retAddrs []string, options ...Option) (*Engine, error) { - var cfg engineConfig - err := cfg.apply(options) +func New(o ...Option) (*Engine, error) { + opts, err := newOptions(o...) if err != nil { return nil, err } - if len(retAddrs) == 0 { - retAddrs = []string{h.Addrs()[0].String()} - log.Infof("Retrieval address not configured, using %s", retAddrs[0]) - } - - ingestCfg.PopulateDefaults() - - // TODO(security): We should not keep the privkey decoded here. - // We should probably unlock it and lock it every time we need it. - // Once we start encrypting the key locally. e := &Engine{ - host: h, - dataTransfer: dt, - ds: ds, - privKey: privKey, - pubSubTopic: ingestCfg.PubSubTopic, - linkedChunkSize: ingestCfg.LinkedChunkSize, - purgeLinkCache: ingestCfg.PurgeLinkCache, - linkCacheSize: ingestCfg.LinkCacheSize, - - addrs: retAddrs, - httpPublisherCfg: &ingestCfg.HttpPublisher, - publisherKind: ingestCfg.PublisherKind, - extraGossipData: cfg.extraGossipData, + options: opts, } e.lsys = e.mkLinkSystem() @@ -137,20 +87,6 @@ func New(ingestCfg config.Ingest, privKey crypto.PrivKey, dt dt.Manager, h host. return e, nil } -// NewFromConfig instantiates a new engine by using the given config.Config. -// The instantiation gets the private key via config.Identity, and uses RetrievalMultiaddrs in -// config.ProviderServer as retrieval addresses in advertisements. -// -// See: engine.New . -func NewFromConfig(cfg config.Config, dt dt.Manager, host host.Host, ds datastore.Batching, options ...Option) (*Engine, error) { - log.Info("Instantiating a new index provider engine") - privKey, err := cfg.Identity.DecodePrivateKey("") - if err != nil { - return nil, fmt.Errorf("cannot decode private key: %s", err) - } - return New(cfg.Ingest, privKey, dt, host, ds, cfg.ProviderServer.RetrievalMultiaddrs, options...) -} - // Start starts the engine by instantiating the internal storage and joins the configured gossipsub // topic used for publishing advertisements. // @@ -160,12 +96,12 @@ func NewFromConfig(cfg config.Config, dt dt.Manager, host host.Host, ds datastor func (e *Engine) Start(ctx context.Context) error { // Create datastore entriesChunker entriesCacheDs := dsn.Wrap(e.ds, datastore.NewKey(linksCachePath)) - cachedChunker, err := chunker.NewCachedEntriesChunker(ctx, entriesCacheDs, e.linkedChunkSize, e.linkCacheSize) + cachedChunker, err := chunker.NewCachedEntriesChunker(ctx, entriesCacheDs, e.entChunkSize, e.entCacheCap) if err != nil { return err } - if e.purgeLinkCache { + if e.purgeCache { err := cachedChunker.Clear(ctx) if err != nil { return err @@ -174,18 +110,10 @@ func (e *Engine) Start(ctx context.Context) error { e.entriesChunker = cachedChunker - if e.publisherKind == config.HttpPublisherKind { - var addr string - addr, err = e.httpPublisherCfg.ListenNetAddr() - if err != nil { - return fmt.Errorf("cannot format http addr for httpPublisher: %s", err) - } - e.publisher, err = httpsync.NewPublisher(addr, e.lsys, e.host.ID(), e.privKey) - } else { - e.publisher, err = dtsync.NewPublisherFromExisting(e.dataTransfer, e.host, e.pubSubTopic, e.lsys, dtsync.WithExtraData(e.extraGossipData)) - } + e.publisher, err = e.newPublisher() if err != nil { - return fmt.Errorf("cannot initialize publisher: %s", err) + log.Errorw("Failed to instantiate legs publisher", "err", "err", "kind", e.pubKind) + return err } err = e.PublishLatest(ctx) @@ -197,6 +125,25 @@ func (e *Engine) Start(ctx context.Context) error { return nil } +func (e *Engine) newPublisher() (legs.Publisher, error) { + switch e.pubKind { + case NoPublisher: + log.Info("Remote announcements is disabled; all advertisements will only be store locally.") + return nil, nil + case DataTransferPublisher: + dtOpts := []dtsync.Option{dtsync.Topic(e.pubTopic), dtsync.WithExtraData(e.pubExtraGossipData)} + if e.pubDT != nil { + return dtsync.NewPublisherFromExisting(e.pubDT, e.h, e.pubTopicName, e.lsys, dtOpts...) + } + ds := dsn.Wrap(e.ds, datastore.NewKey("/legs/dtsync/pub")) + return dtsync.NewPublisher(e.h, ds, e.lsys, e.pubTopicName, dtOpts...) + case HttpPublisher: + return httpsync.NewPublisher(e.pubHttpListenAddr, e.lsys, e.h.ID(), e.key) + default: + return nil, fmt.Errorf("unknown publisher kind: %s", e.pubKind) + } +} + // PublishLocal stores the advertisement in the local link system and marks it locally as the latest // advertisement. // @@ -204,21 +151,19 @@ func (e *Engine) Start(ctx context.Context) error { // // See: Engine.Publish. func (e *Engine) PublishLocal(ctx context.Context, adv schema.Advertisement) (cid.Cid, error) { - adLnk, err := schema.AdvertisementLink(e.lsys, adv) + lnk, err := e.lsys.Store(ipld.LinkContext{Ctx: ctx}, schema.Linkproto, adv.Representation()) if err != nil { return cid.Undef, fmt.Errorf("cannot generate advertisement link: %s", err) } + c := lnk.(cidlink.Link).Cid + log := log.With("adCid", c) + log.Info("Stored ad in local link system") - c := adLnk.ToCid() - // Store latest advertisement published from the chain - // - // NOTE: The datastore should be thread-safe, if not we need a lock to - // protect races on this value. - log.Infow("Storing advertisement locally", "cid", c.String()) - err = e.putLatestAdv(ctx, c.Bytes()) - if err != nil { - return cid.Undef, fmt.Errorf("cannot store latest advertisement in blockstore: %s", err) + if err := e.putLatestAdv(ctx, c.Bytes()); err != nil { + log.Errorw("Failed to update reference to the latest advertisement", "err", err) + return cid.Undef, fmt.Errorf("failed to update reference to latest advertisement: %w", err) } + log.Info("Updated reference to the latest advertisement successfully") return c, nil } @@ -229,36 +174,45 @@ func (e *Engine) PublishLocal(ctx context.Context, adv schema.Advertisement) (ci // The publication mechanism uses legs.Publisher internally. // See: https://github.com/filecoin-project/go-legs func (e *Engine) Publish(ctx context.Context, adv schema.Advertisement) (cid.Cid, error) { - // Store the advertisement locally. c, err := e.PublishLocal(ctx, adv) if err != nil { - return cid.Undef, fmt.Errorf("failed to publish advertisement locally: %s", err) + log.Errorw("Failed to store advertisement locally", "err", err) + return cid.Undef, fmt.Errorf("failed to publish advertisement locally: %w", err) } - log.Infow("Publishing advertisement in pubsub channel", "cid", c) - // Publish the advertisement. - err = e.publisher.UpdateRoot(ctx, c) - if err != nil { - return cid.Undef, err + // Only announce the advertisement CID if publisher is configured. + if e.publisher != nil { + log := log.With("adCid", c) + log.Info("Publishing advertisement in pubsub channel") + err = e.publisher.UpdateRoot(ctx, c) + if err != nil { + log.Errorw("Failed to announce advertisement on pubsub channel ", "err", err) + return cid.Undef, err + } } return c, nil } // PublishLatest re-publishes the latest existing advertisement to pubsub. func (e *Engine) PublishLatest(ctx context.Context) error { + // Skip announcing the latest advertisement CID if there is no publisher. + if e.publisher == nil { + log.Infow("Skipped announcing the latest: remote announcements are disabled.") + return nil + } + adCid, err := e.getLatestAdCid(ctx) if err != nil { - return fmt.Errorf("could not get latest advertisement cid from blockstore: %s", err) + log.Errorw("Failed to get the latest advertisement CID", "err", err) + return fmt.Errorf("failed to get latest advertisement cid from blockstore: %w", err) } if adCid == cid.Undef { - log.Info("No previously published advertisements") + log.Info("Skipped announcing the latest: no previously published advertisements.") return nil } - log.Infow("Republishing latest advertisement", "cid", adCid) - // Re-publish the advertisement. return e.publisher.UpdateRoot(ctx, adCid) } @@ -305,14 +259,14 @@ func (e *Engine) NotifyRemove(ctx context.Context, contextID []byte) (cid.Cid, e // The engine is no longer usable after the call to this function. func (e *Engine) Shutdown() error { var errs error - err := e.publisher.Close() - if err != nil { - errs = multierror.Append(errs, fmt.Errorf("error closing leg publisher: %s", err)) + if e.publisher != nil { + if err := e.publisher.Close(); err != nil { + errs = multierror.Append(errs, fmt.Errorf("error closing leg publisher: %s", err)) + } } - if err = e.entriesChunker.Close(); err != nil { + if err := e.entriesChunker.Close(); err != nil { errs = multierror.Append(errs, fmt.Errorf("error closing link entriesChunker: %s", err)) } - return errs } @@ -480,8 +434,8 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, contextID []byte, metad previousLnk = nb.Build().(schema.Link_Advertisement) } - adv, err := schema.NewAdvertisement(e.privKey, previousLnk, cidsLnk, - contextID, metadata, isRm, e.host.ID().String(), e.addrs) + adv, err := schema.NewAdvertisement(e.key, previousLnk, cidsLnk, + contextID, metadata, isRm, e.h.ID().String(), e.retrievalAddrsAsString()) if err != nil { return cid.Undef, fmt.Errorf("failed to create advertisement: %s", err) } diff --git a/engine/engine_test.go b/engine/engine_test.go index 5f5d4f6b..4b870990 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -11,7 +11,6 @@ import ( "github.com/filecoin-project/go-legs" provider "github.com/filecoin-project/index-provider" - "github.com/filecoin-project/index-provider/config" "github.com/filecoin-project/index-provider/testutil" stiapi "github.com/filecoin-project/storetheindex/api/v0" "github.com/filecoin-project/storetheindex/api/v0/ingest/schema" @@ -127,20 +126,11 @@ func TestEngine_NotifyRemoveWithUnknownContextIDIsError(t *testing.T) { } func mkEngine(t *testing.T) *Engine { - ingestCfg := config.NewIngest() - ingestCfg.PubSubTopic = testTopic - return mkEngineWithConfig(t, ingestCfg) + return mkEngineWithOptions(t, WithTopicName(testTopic), WithPublisherKind(DataTransferPublisher)) } -func mkEngineWithConfig(t *testing.T, cfg config.Ingest) *Engine { - priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) - require.NoError(t, err) - h := mkTestHost(t) - - store := dssync.MutexWrap(datastore.NewMapDatastore()) - - dt := testutil.SetupDataTransferOnHost(t, h, store, cidlink.DefaultLinkSystem()) - engine, err := New(cfg, priv, dt, h, store, nil) +func mkEngineWithOptions(t *testing.T, o ...Option) *Engine { + engine, err := New(o...) require.NoError(t, err) err = engine.Start(context.Background()) require.NoError(t, err) @@ -249,7 +239,7 @@ func TestNotifyPublish(t *testing.T) { t.Cleanup(clean(ls, e, cncl)) // Connect subscribe with provider engine. - connectHosts(t, e.host, lh) + connectHosts(t, e.h, lh) // per https://github.com/libp2p/go-libp2p-pubsub/blob/e6ad80cf4782fca31f46e3a8ba8d1a450d562f49/gossipsub_test.go#L103 // we don't seem to have a way to manually trigger needed gossip-sub heartbeats for mesh establishment. @@ -290,7 +280,7 @@ func TestNotifyPutAndRemoveCids(t *testing.T) { t.Cleanup(clean(ls, e, cncl)) // Connect subscribe with provider engine. - connectHosts(t, e.host, lh) + connectHosts(t, e.h, lh) // per https://github.com/libp2p/go-libp2p-pubsub/blob/e6ad80cf4782fca31f46e3a8ba8d1a450d562f49/gossipsub_test.go#L103 // we don't seem to have a way to manually trigger needed gossip-sub heartbeats for mesh establishment. @@ -375,7 +365,7 @@ func TestNotifyPutWithCallback(t *testing.T) { t.Cleanup(clean(ls, e, cncl)) // Connect subscribe with provider engine. - connectHosts(t, e.host, lh) + connectHosts(t, e.h, lh) // per https://github.com/libp2p/go-libp2p-pubsub/blob/e6ad80cf4782fca31f46e3a8ba8d1a450d562f49/gossipsub_test.go#L103 // we don't seem to have a way to manually trigger needed gossip-sub heartbeats for mesh establishment. @@ -454,10 +444,9 @@ func skipFlaky(t *testing.T) { } func Test_EmptyConfigSetsDefaults(t *testing.T) { - engine, err := New(config.Ingest{}, nil, nil, mkTestHost(t), nil, nil) + engine, err := New() require.NoError(t, err) - require.True(t, engine.linkedChunkSize > 0) - require.True(t, engine.linkCacheSize > 0) - require.True(t, engine.pubSubTopic != "") - require.True(t, engine.pubSubTopic != "") + require.True(t, engine.entChunkSize > 0) + require.True(t, engine.entCacheCap > 0) + require.True(t, engine.pubTopicName != "") } diff --git a/engine/example_test.go b/engine/example_test.go index 469268d4..fb21ee92 100644 --- a/engine/example_test.go +++ b/engine/example_test.go @@ -2,28 +2,13 @@ package engine_test import ( "context" - "crypto/rand" - "errors" "fmt" "io" - "time" - datatransfer "github.com/filecoin-project/go-data-transfer" - dtimpl "github.com/filecoin-project/go-data-transfer/impl" - dtnetwork "github.com/filecoin-project/go-data-transfer/network" - gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync" provider "github.com/filecoin-project/index-provider" - "github.com/filecoin-project/index-provider/config" "github.com/filecoin-project/index-provider/engine" "github.com/filecoin-project/index-provider/metadata" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - gsimpl "github.com/ipfs/go-graphsync/impl" - gsnet "github.com/ipfs/go-graphsync/network" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/host" "github.com/multiformats/go-multihash" ) @@ -33,9 +18,6 @@ import ( // Note that the advertisement published uses metadata.BitswapMetadata. This is for demonstrative // purposes only. The example does not set up the retrieval side for the content. func Example_advertiseHelloWorld() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - // Get the multihash of content to advertise content := "Hello World!" sayHelloCtxID := "Say hello" @@ -46,39 +28,22 @@ func Example_advertiseHelloWorld() { } fmt.Printf("✓ Generated content multihash: %s\n", mh.B58String()) - // Generate a random peer identity - priv, _, err := crypto.GenerateEd25519Key(rand.Reader) - if err != nil { - panic(err) - } - fmt.Println("✓ Generated random peer identity.") - // Create a new libp2p host - h, err := libp2p.New(libp2p.Identity(priv)) + h, err := libp2p.New() if err != nil { panic(err) } // Only print the first three characters to keep golang example output happy. - fmt.Printf("✓ Instantiated new libp2p host with peer ID: %s...\n", h.ID().String()[:3]) - - store := dssync.MutexWrap(datastore.NewMapDatastore()) - defer store.Close() - - // Set up datatrasfer and graphsync - dt, err := startDatatrasfer(h, ctx, store) - if err != nil { - panic(err) - } - fmt.Println("✓ Datatransfer manager is ready.") + fmt.Printf("✓ Instantiated new libp2p host with peer ID: %s...\n", h.ID().String()[:2]) - // Construct a new provider engine with default configuration. - cfg := config.NewIngest() - engine, err := engine.New(cfg, priv, dt, h, store, nil) + // Construct a new provider engine with given libp2p host that announces advertisements over + // gossipsub and datatrasfer/graphsync. + engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher)) if err != nil { panic(err) } - fmt.Println("✓ Instantiated provider engine with config:") - fmt.Printf(" announcements topic: %s\n", cfg.PubSubTopic) + fmt.Println("✓ Instantiated provider engine") + defer engine.Shutdown() engine.RegisterCallback(func(ctx context.Context, contextID []byte) (provider.MultihashIterator, error) { if string(contextID) == sayHelloCtxID { @@ -110,45 +75,13 @@ func Example_advertiseHelloWorld() { //Output: //Preparing to advertise content: 'Hello World!' //✓ Generated content multihash: QmWvQxTqbG2Z9HPJgG57jjwR154cKhbtJenbyYTWkjgF3e - //✓ Generated random peer identity. - //✓ Instantiated new libp2p host with peer ID: 12D... - //✓ Datatransfer manager is ready. - //✓ Instantiated provider engine with config: - // announcements topic: /indexer/ingest/mainnet + //✓ Instantiated new libp2p host with peer ID: Qm... + //✓ Instantiated provider engine //✓ Registered callback for context ID: Say hello //✓ Provider engine started. //✓ Published advertisement for content with CID: bag... } -func startDatatrasfer(h host.Host, ctx context.Context, ds datastore.Batching) (datatransfer.Manager, error) { - gn := gsnet.NewFromLibp2pHost(h) - dtNet := dtnetwork.NewFromLibp2pHost(h) - gs := gsimpl.New(ctx, gn, cidlink.DefaultLinkSystem()) - tp := gstransport.NewTransport(h.ID(), gs) - dt, err := dtimpl.NewDataTransfer(ds, dtNet, tp) - if err != nil { - return nil, err - } - ready := make(chan error, 1) - dt.OnReady(func(err error) { - ready <- err - }) - if err := dt.Start(ctx); err != nil { - return nil, err - } - - timer := time.NewTimer(2 * time.Second) - select { - case readyErr := <-ready: - if readyErr != nil { - return nil, readyErr - } - case <-timer.C: - return nil, errors.New("timed out waiting for datatrasfer to be ready") - } - return dt, nil -} - type singleMhIterator struct { offset int mh multihash.Multihash diff --git a/engine/linksystem_test.go b/engine/linksystem_test.go index 83ee3115..dc457f2e 100644 --- a/engine/linksystem_test.go +++ b/engine/linksystem_test.go @@ -9,7 +9,6 @@ import ( "time" provider "github.com/filecoin-project/index-provider" - "github.com/filecoin-project/index-provider/config" "github.com/filecoin-project/index-provider/engine/chunker" "github.com/filecoin-project/index-provider/metadata" "github.com/filecoin-project/index-provider/testutil" @@ -99,20 +98,19 @@ func Test_EvictedCachedEntriesChainIsRegeneratedGracefully(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - cfg := config.NewIngest() - cfg.LinkedChunkSize = 2 - cfg.LinkCacheSize = 1 - subject := mkEngineWithConfig(t, cfg) + chunkSize := 2 + cacheCap := 1 + subject := mkEngineWithOptions(t, WithEntriesCacheCapacity(cacheCap), WithEntriesChunkSize(chunkSize)) ad1CtxID := []byte("first") ad1MhCount := 12 - wantAd1EntriesChainLen := ad1MhCount / cfg.LinkedChunkSize + wantAd1EntriesChainLen := ad1MhCount / chunkSize ad1Mhs, err := testutil.RandomCids(rng, ad1MhCount) require.NoError(t, err) ad2CtxID := []byte("second") ad2MhCount := 10 - wantAd2ChunkLen := ad2MhCount / cfg.LinkedChunkSize + wantAd2ChunkLen := ad2MhCount / chunkSize ad2Mhs, err := testutil.RandomCids(rng, ad2MhCount) require.NoError(t, err) diff --git a/engine/option.go b/engine/option.go deleted file mode 100644 index e6ed26e3..00000000 --- a/engine/option.go +++ /dev/null @@ -1,32 +0,0 @@ -package engine - -import "fmt" - -// engineConfig contains all options for engineConfiguring Engine. -type engineConfig struct { - extraGossipData []byte -} - -type Option func(*engineConfig) error - -// apply applies the given options to this engineConfig. -func (c *engineConfig) apply(opts []Option) error { - for i, opt := range opts { - if err := opt(c); err != nil { - return fmt.Errorf("option %d failed: %s", i, err) - } - } - return nil -} - -// WithExtraGossipData supplies extra data to include in the pubsub announcement. -func WithExtraGossipData(extraData []byte) Option { - return func(c *engineConfig) error { - if len(extraData) != 0 { - // Make copy for safety. - c.extraGossipData = make([]byte, len(extraData)) - copy(c.extraGossipData, extraData) - } - return nil - } -} diff --git a/engine/options.go b/engine/options.go new file mode 100644 index 00000000..ed859f97 --- /dev/null +++ b/engine/options.go @@ -0,0 +1,258 @@ +package engine + +import ( + "fmt" + + datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/host" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/multiformats/go-multiaddr" +) + +const ( + // NoPublisher indicates that no announcements are made to the network and all advertisements + // are only stored locally. + NoPublisher PublisherKind = "" + + // DataTransferPublisher makes announcements over a gossipsub topic and exposes a + // datatransfer/graphsync server that allows peers in the network to sync advertisements. + DataTransferPublisher PublisherKind = "dtsync" + + // HttpPublisher exposes a HTTP server that announces published advertisements and allows peers + // in the network to sync them over raw HTTP transport. + HttpPublisher PublisherKind = "http" +) + +type ( + // PublisherKind represents the kind of publisher to use in order to announce a new + // advertisement to the network. + // See: WithPublisherKind, NoPublisher, DataTransferPublisher, HttpPublisher. + PublisherKind string + + // Option sets a configuration parameter for the provider engine. + Option func(*options) error + + options struct { + ds datastore.Batching + h host.Host + // key is always initialized from the host peerstore. + // Setting an explicit identity must not be exposed unless it is tightly coupled with the + // host identity. Otherwise, the signature of advertisement will not match the libp2p host + // ID. + key crypto.PrivKey + retrievalAddrs []multiaddr.Multiaddr + + pubKind PublisherKind + pubDT datatransfer.Manager + pubHttpListenAddr string + pubTopicName string + pubTopic *pubsub.Topic + pubExtraGossipData []byte + + entCacheCap int + entChunkSize int + purgeCache bool + } +) + +func newOptions(o ...Option) (*options, error) { + opts := &options{ + pubKind: NoPublisher, + pubHttpListenAddr: "0.0.0.0:3104", + pubTopicName: "/indexer/ingest/mainnet", + // Keep 1024 chunks in cache; keeps 256MiB if chunks are 0.25MiB. + entCacheCap: 1024, + // Multihashes are 128 bytes so 16384 results in 0.25MiB chunk when full. + entChunkSize: 16384, + purgeCache: false, + } + + for _, apply := range o { + if err := apply(opts); err != nil { + return nil, err + } + } + + if opts.ds == nil { + opts.ds = dssync.MutexWrap(datastore.NewMapDatastore()) + } + + if opts.h == nil { + h, err := libp2p.New() + if err != nil { + return nil, err + } + log.Infow("Libp2p host is not configured, but required; created a new host.", "id", h.ID()) + opts.h = h + } + + // Initialize private key from libp2p host + opts.key = opts.h.Peerstore().PrivKey(opts.h.ID()) + // Defensively check that host's self private key is indeed set. + if opts.key == nil { + return nil, fmt.Errorf("cannot find private key in self peerstore; libp2p host is misconfigured") + } + + if len(opts.retrievalAddrs) == 0 { + opts.retrievalAddrs = opts.h.Addrs() + log.Infow("Retrieval address not configured; using host listen addresses instead.", "retrievalAddrs", opts.retrievalAddrs) + } + + return opts, nil +} + +func (o *options) retrievalAddrsAsString() []string { + var ras []string + for _, ra := range o.retrievalAddrs { + ras = append(ras, ra.String()) + } + return ras +} + +// WithPurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine +// starts. +// If unset, cache is rehydrated from previously cached entries stored in datastore if present. +// See: WithDatastore. +func WithPurgeCacheOnStart(p bool) Option { + return func(o *options) error { + o.purgeCache = p + return nil + } +} + +// WithEntriesChunkSize sets the maximum number of multihashes to include in a single entries chunk. +// If unset, the default size of 16384 is used. +// +// See: WithEntriesCacheCapacity, chunker.CachedEntriesChunker +func WithEntriesChunkSize(s int) Option { + return func(o *options) error { + o.entChunkSize = s + return nil + } +} + +// WithEntriesCacheCapacity sets the maximum number of advertisement entries chains to cache. +// If unset, the default capacity of 1024 is used. +// +// The cache is evicted using LRU policy. Note that the capacity dictates the number of complete +// chains that are cached, not individual entry chunks. This means, the maximum storage used by the +// cache is a factor of capacity, chunk size and the length of multihashes in each chunk. +// +// As an example, for 128-bit long multihashes the cache with default capacity of 1024, and default +// chunk size of 16384 can grow up to 256MiB when full. +// +// See: WithEntriesChunkSize, chunker.CachedEntriesChunker. +func WithEntriesCacheCapacity(s int) Option { + return func(o *options) error { + o.entCacheCap = s + return nil + } +} + +// WithPublisherKind sets the kind of publisher used to announce new advertisements. +// If unset, advertisements are only stored locally and no announcements are made. +// See: PublisherKind. +func WithPublisherKind(k PublisherKind) Option { + return func(o *options) error { + o.pubKind = k + return nil + } +} + +// WithHttpPublisherListenAddr sets the net listen address for the HTTP publisher. +// If unset, the default net listen address of '0.0.0.0:3104' is used. +// +// Note that this option only takes effect if the PublisherKind is set to HttpPublisher. +// See: WithPublisherKind. +func WithHttpPublisherListenAddr(addr string) Option { + return func(o *options) error { + o.pubHttpListenAddr = addr + return nil + } +} + +// WithTopicName sets toe topic name on which pubsub announcements are published. +// To override the default pubsub configuration, use WithTopic. +// +// Note that this option only takes effect if the PublisherKind is set to DataTransferPublisher. +// See: WithPublisherKind. +func WithTopicName(t string) Option { + return func(o *options) error { + o.pubTopicName = t + return nil + } +} + +// WithTopic sets the pubsub topic on which new advertisements are announced. +// To use the default pubsub configuration with a specific topic name, use WithTopicName. If both +// options are specified, WithTopic takes presence. +// +// Note that this option only takes effect if the PublisherKind is set to DataTransferPublisher. +// See: WithPublisherKind. +func WithTopic(t *pubsub.Topic) Option { + return func(o *options) error { + o.pubTopic = t + return nil + } +} + +// WithDataTransfer sets the instance of datatransfer.Manager to use. +// If unspecified a new instance is created automatically. +// +// Note that this option only takes effect if the PublisherKind is set to DataTransferPublisher. +// See: WithPublisherKind. +func WithDataTransfer(dt datatransfer.Manager) Option { + return func(o *options) error { + o.pubDT = dt + return nil + } +} + +// WithHost specifies the host to which the provider engine belongs. +// If unspecified, a host is created automatically. +// See: libp2p.New. +func WithHost(h host.Host) Option { + return func(o *options) error { + o.h = h + return nil + } +} + +// WithDatastore sets the datastore that is used by the engine to store advertisements. +// If unspecified, an ephemeral in-memory datastore is used. +// See: datastore.NewMapDatastore. +func WithDatastore(ds datastore.Batching) Option { + return func(o *options) error { + o.ds = ds + return nil + } +} + +// WithRetrievalAddrs sets the addresses that specify where to get the content corresponding to an +// indexing advertisement. +// If unspecified, the libp2p host listen addresses are used. +// See: WithHost. +func WithRetrievalAddrs(addr ...multiaddr.Multiaddr) Option { + return func(o *options) error { + o.retrievalAddrs = addr + return nil + } +} + +// WithExtraGossipData supplies extra data to include in the pubsub announcement. +// Note that this option only takes effect if the PublisherKind is set to DataTransferPublisher. +// See: WithPublisherKind. +func WithExtraGossipData(extraData []byte) Option { + return func(o *options) error { + if len(extraData) != 0 { + // Make copy for safety. + o.pubExtraGossipData = make([]byte, len(extraData)) + copy(o.pubExtraGossipData, extraData) + } + return nil + } +} diff --git a/go.mod b/go.mod index 24db3186..ba7cfcaf 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/ipld/go-ipld-prime v0.14.4 github.com/libp2p/go-libp2p v0.18.0-rc1 github.com/libp2p/go-libp2p-core v0.14.0 + github.com/libp2p/go-libp2p-pubsub v0.6.1 github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.5.0 github.com/multiformats/go-multicodec v0.4.0