From 1f7569d19e8aa3280612244fc9efdf15aa666178 Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Thu, 21 Apr 2022 12:13:34 +0100 Subject: [PATCH] Infer index provider topic from network name by default Index provider integration uses a gossipsub topic to announce changes to the advertised content. The topic name was fixed to the default topic which is `/indexer/ingest/mainnet`. In the case of lotus, the gossipsub validators enforce a list of topics the instance is permitted to join by setting subscription filter option when `PubSub` instance is constructed via DI. Having the fixed topic name meant that any SP starting up a node on a network other than `mainnet` would have to override the default config to avoid the node crashing when index provider is enabled. Instead of a fixed default, the changes here infer the allowed indexer topic name from network name automatically if the topic configuration is left empty. Fixes #8510 --- .../en/default-lotus-miner-config.toml | 6 ++-- node/config/def.go | 6 ++-- node/config/def_test.go | 4 +-- node/config/doc_gen.go | 4 ++- node/config/types.go | 4 ++- node/modules/storageminer_idxprov.go | 28 ++++++++++++++----- 6 files changed, 37 insertions(+), 15 deletions(-) diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index ef2c4f3433f..1e1b0369d46 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -291,11 +291,13 @@ #EntriesChunkSize = 16384 # TopicName sets the topic name on which the changes to the advertised content are announced. - # Defaults to '/indexer/ingest/mainnet' if not specified. + # If not explicitly specified, the topic name is automatically inferred from the network name + # in following format: '/indexer/ingest/' + # Defaults to empty, which implies the topic name is inferred from network name. # # type: string # env var: LOTUS_INDEXPROVIDER_TOPICNAME - #TopicName = "/indexer/ingest/mainnet" + #TopicName = "" # PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine # starts. By default, the cache is rehydrated from previously cached entries stored in diff --git a/node/config/def.go b/node/config/def.go index 0401b0e4401..04c51208281 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -199,8 +199,10 @@ func DefaultStorageMiner() *StorageMiner { Enable: true, EntriesCacheCapacity: 1024, EntriesChunkSize: 16384, - TopicName: "/indexer/ingest/mainnet", - PurgeCacheOnStart: false, + // The default empty TopicName means it is inferred from network name, in the following + // format: "/indexer/ingest/" + TopicName: "", + PurgeCacheOnStart: false, }, Subsystems: MinerSubsystemConfig{ diff --git a/node/config/def_test.go b/node/config/def_test.go index 9a450e66b82..d644ae33630 100644 --- a/node/config/def_test.go +++ b/node/config/def_test.go @@ -74,8 +74,8 @@ func TestDefaultMinerRoundtrip(t *testing.T) { require.True(t, reflect.DeepEqual(c, c2)) } -func TestDefaultStorageMiner_SetsIndexIngestTopic(t *testing.T) { +func TestDefaultStorageMiner_IsEmpty(t *testing.T) { subject := DefaultStorageMiner() require.True(t, subject.IndexProvider.Enable) - require.Equal(t, "/indexer/ingest/mainnet", subject.IndexProvider.TopicName) + require.Equal(t, "", subject.IndexProvider.TopicName) } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 7d3306d91bd..cf51fb13e2a 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -404,7 +404,9 @@ advertisements that include more multihashes than the configured EntriesChunkSiz Type: "string", Comment: `TopicName sets the topic name on which the changes to the advertised content are announced. -Defaults to '/indexer/ingest/mainnet' if not specified.`, +If not explicitly specified, the topic name is automatically inferred from the network name +in following format: '/indexer/ingest/' +Defaults to empty, which implies the topic name is inferred from network name.`, }, { Name: "PurgeCacheOnStart", diff --git a/node/config/types.go b/node/config/types.go index 3d50a14e629..b5b1fae7ed4 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -186,7 +186,9 @@ type IndexProviderConfig struct { EntriesChunkSize int // TopicName sets the topic name on which the changes to the advertised content are announced. - // Defaults to '/indexer/ingest/mainnet' if not specified. + // If not explicitly specified, the topic name is automatically inferred from the network name + // in following format: '/indexer/ingest/' + // Defaults to empty, which implies the topic name is inferred from network name. TopicName string // PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine diff --git a/node/modules/storageminer_idxprov.go b/node/modules/storageminer_idxprov.go index 365648691d5..fdb941128cc 100644 --- a/node/modules/storageminer_idxprov.go +++ b/node/modules/storageminer_idxprov.go @@ -6,6 +6,7 @@ import ( "github.com/filecoin-project/go-address" provider "github.com/filecoin-project/index-provider" "github.com/filecoin-project/index-provider/engine" + "github.com/filecoin-project/lotus/build" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/libp2p/go-libp2p-core/host" @@ -24,8 +25,20 @@ type IdxProv struct { Datastore dtypes.MetadataDS } -func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) { - return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) { +func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub, nn dtypes.NetworkName) (provider.Interface, error) { + return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub, nn dtypes.NetworkName) (provider.Interface, error) { + topicName := cfg.TopicName + // If indexer topic name is left empty, infer it from the network name. + if topicName == "" { + // Use the same mechanism as the Dependency Injection (DI) to construct the topic name, + // so that we are certain it is consistent with the name allowed by the subscription + // filter. + // + // See: lp2p.GossipSub. + topicName = build.IndexerIngestTopic(nn) + log.Debugw("Inferred indexer topic from network name", "topic", topicName) + } + ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider")) var opts = []engine.Option{ engine.WithDatastore(ipds), @@ -33,24 +46,24 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo engine.WithRetrievalAddrs(marketHost.Addrs()...), engine.WithEntriesCacheCapacity(cfg.EntriesCacheCapacity), engine.WithEntriesChunkSize(cfg.EntriesChunkSize), - engine.WithTopicName(cfg.TopicName), + engine.WithTopicName(topicName), engine.WithPurgeCacheOnStart(cfg.PurgeCacheOnStart), } llog := log.With( "idxProvEnabled", cfg.Enable, "pid", marketHost.ID(), - "topic", cfg.TopicName, + "topic", topicName, "retAddrs", marketHost.Addrs()) // If announcements to the network are enabled, then set options for datatransfer publisher. if cfg.Enable { // Join the indexer topic using the market's pubsub instance. Otherwise, the provider // engine would create its own instance of pubsub down the line in go-legs, which has // no validators by default. - t, err := ps.Join(cfg.TopicName) + t, err := ps.Join(topicName) if err != nil { llog.Errorw("Failed to join indexer topic", "err", err) - return nil, xerrors.Errorf("joining indexer topic %s: %w", cfg.TopicName, err) + return nil, xerrors.Errorf("joining indexer topic %s: %w", topicName, err) } // Get the miner ID and set as extra gossip data. @@ -62,9 +75,10 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo engine.WithExtraGossipData(ma.Bytes()), engine.WithTopic(t), ) - llog = llog.With("extraGossipData", ma) + llog = llog.With("extraGossipData", ma, "publisher", "data-transfer") } else { opts = append(opts, engine.WithPublisherKind(engine.NoPublisher)) + llog = llog.With("publisher", "none") } // Instantiate the index provider engine.