diff --git a/README.md b/README.md index 111563d7..ece333be 100644 --- a/README.md +++ b/README.md @@ -21,19 +21,26 @@ git clone https://github.com/filecoin-project/venus-market.git cd venus-market make ``` -## how to setup venus-market +## how to set up venus-market -run as venus-pool service +run: + +- run in chain service +```shell script +./venus-market run --auth-url= --node-url= --messager-url= --gateway-url= --cs-token= --signer-type="gateway" +``` + +- run in local, conn venus chain service and use lotus-wallet/venus-wallet to sign ```shell script -./venus-market pool-run --auth-url --node-url --messager-url --gateway-url --auth-token --payment-addr +./venus-market run --auth-url= --node-url= --messager-url= --cs-token= --signer-type="wallet" --signer-url= --signer-token= ``` -run in local +- run in local, conn lotus full node and use lotus full node to sign ```shell script -./venus-market solo-run --node-url --node-token --signer-url --signer-token --payment-addr +./venus-market run --node-url= --messager-url= --cs-token= --signer-type="lotusnode" ``` -set peerid and address +set peer id and address ```shell script ./venus-market net listen #query venus-market address and peerid diff --git a/api/clients/mix_msgclient.go b/api/clients/mix_msgclient.go index d456b5cc..f39e7dd9 100644 --- a/api/clients/mix_msgclient.go +++ b/api/clients/mix_msgclient.go @@ -37,7 +37,7 @@ type MixMsgClient struct { full v1api.FullNode venusMessager IVenusMessager signer signer.ISigner - nonceAssign *nonceAssigner + nonceAssign INonceAssigner } func NewMixMsgClient(params MPoolReplaceParams) IMixMessage { diff --git a/api/clients/modules.go b/api/clients/modules.go index d0218b8c..98965538 100644 --- a/api/clients/modules.go +++ b/api/clients/modules.go @@ -3,51 +3,26 @@ package clients import ( logging "github.com/ipfs/go-log/v2" - "github.com/ipfs-force-community/metrics" "github.com/ipfs-force-community/venus-common-utils/builder" - "github.com/ipfs-force-community/venus-gateway/marketevent" - - gwTypes "github.com/ipfs-force-community/venus-gateway/types" "github.com/filecoin-project/venus-market/v2/api/clients/signer" "github.com/filecoin-project/venus-market/v2/config" v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" - gwAPI "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" ) var log = logging.Logger("clients") -func NewMarketEvent(mctx metrics.MetricsCtx) (gwAPI.IMarketEvent, error) { - stream := marketevent.NewMarketEventStream(mctx, &localMinerValidator{}, gwTypes.DefaultConfig()) - return stream, nil -} - -var ClientsOpts = func(server bool, mode string, msgCfg *config.Messager, signerCfg *config.Signer) builder.Option { - opts := builder.Options( +var ClientsOpts = func(server bool, msgCfg *config.Messager, signerCfg *config.Signer) builder.Option { + return builder.Options( builder.Override(new(IMixMessage), NewMixMsgClient), builder.Override(new(signer.ISigner), signer.NewISignerClient(server)), builder.ApplyIf( func(s *builder.Settings) bool { return len(msgCfg.Url) > 0 }, - builder.Override(new(IVenusMessager), MessagerClient)), - ) - - if server { - return builder.Options(opts, - builder.Override(new(v1api.FullNode), NodeClient), - - builder.ApplyIf( - func(s *builder.Settings) bool { - return mode == "solo" - }, - builder.Override(new(gwAPI.IMarketEvent), NewMarketEvent), - ), - ) - } - - return builder.Options(opts, + builder.Override(new(IVenusMessager), MessagerClient), + ), builder.Override(new(v1api.FullNode), NodeClient), ) } diff --git a/api/clients/nonce_assign.go b/api/clients/nonce_assign.go index 39f022ed..fb7f9510 100644 --- a/api/clients/nonce_assign.go +++ b/api/clients/nonce_assign.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/filecoin-project/go-address" + v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" ) @@ -17,11 +18,11 @@ type nonceAssigner struct { full v1api.FullNode } -func newNonceAssign(full v1api.FullNode) *nonceAssigner { +func newNonceAssign(full v1api.FullNode) INonceAssigner { return &nonceAssigner{full: full, lk: sync.Mutex{}} } -// AssignNonce assign next nonce for address, in solo mode, should use a separate address for market message, should save nonce +// AssignNonce assign next nonce for address, should use a separate address for market message, should save nonce // when only connect one daemon, MpoolGetNonce works well, but may have conflict nonce if use multiple daemon behind proxy // todo save assgined nonce in local database func (nonceAssign *nonceAssigner) AssignNonce(ctx context.Context, addr address.Address) (uint64, error) { diff --git a/api/clients/storage_miner.go b/api/clients/storage_miner.go index 74878f8e..dea2d37f 100644 --- a/api/clients/storage_miner.go +++ b/api/clients/storage_miner.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/go-cid" ) +// MarketRequestEvent todo no use type MarketRequestEvent interface { IsUnsealed(ctx context.Context, miner address.Address, pieceCid cid.Cid, sector storage.SectorRef, offset types.PaddedByteIndex, size abi.PaddedPieceSize) (bool, error) // SectorsUnsealPiece will Unseal a Sealed sector file for the given sector. diff --git a/api/impl/market_event.go b/api/impl/market_event.go deleted file mode 100644 index 56591766..00000000 --- a/api/impl/market_event.go +++ /dev/null @@ -1,33 +0,0 @@ -package impl - -import ( - "context" - "fmt" - - "go.uber.org/fx" - - v2GatewayAPI "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" - "github.com/filecoin-project/venus/venus-shared/types/gateway" -) - -type MarketEventAPI struct { - fx.In - - Event v2GatewayAPI.IMarketEvent `optional:"true"` -} - -var errNotSupportGateWayMode = fmt.Errorf("MarketEvent api supported only when it runs in 'solo' mode") - -func (marketEvent *MarketEventAPI) ResponseMarketEvent(ctx context.Context, resp *gateway.ResponseEvent) error { - if marketEvent.Event == nil { - return errNotSupportGateWayMode - } - return marketEvent.Event.ResponseMarketEvent(ctx, resp) -} - -func (marketEvent *MarketEventAPI) ListenMarketEvent(ctx context.Context, policy *gateway.MarketRegisterPolicy) (<-chan *gateway.RequestEvent, error) { - if marketEvent.Event == nil { - return nil, errNotSupportGateWayMode - } - return marketEvent.Event.ListenMarketEvent(ctx, policy) -} diff --git a/api/impl/venus_market.go b/api/impl/venus_market.go index a5c35d9a..969316c9 100644 --- a/api/impl/venus_market.go +++ b/api/impl/venus_market.go @@ -9,11 +9,6 @@ import ( "sort" "time" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/pkg/errors" "go.uber.org/fx" "github.com/filecoin-project/dagstore" @@ -27,6 +22,11 @@ import ( "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/builtin/v8/paych" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/filecoin-project/venus-auth/jwtclient" @@ -43,21 +43,24 @@ import ( "github.com/filecoin-project/venus/pkg/constants" v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" - marketapi "github.com/filecoin-project/venus/venus-shared/api/market" + gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" + marketAPI "github.com/filecoin-project/venus/venus-shared/api/market" vTypes "github.com/filecoin-project/venus/venus-shared/types" + gatewayTypes "github.com/filecoin-project/venus/venus-shared/types/gateway" types "github.com/filecoin-project/venus/venus-shared/types/market" ) var ( - _ marketapi.IMarket = (*MarketNodeImpl)(nil) + _ marketAPI.IMarket = (*MarketNodeImpl)(nil) log = logging.Logger("market_api") ) type MarketNodeImpl struct { - FundAPI - MarketEventAPI fx.In + FundAPI + gatewayAPIV2.IMarketServiceProvider + FullNode v1api.FullNode Host host.Host StorageProvider storageprovider.StorageProvider @@ -114,6 +117,14 @@ type MarketNodeImpl struct { SetMaxMarketBalanceAddFeeFunc config.SetMaxMarketBalanceAddFeeFunc } +func (m MarketNodeImpl) ResponseMarketEvent(ctx context.Context, resp *gatewayTypes.ResponseEvent) error { + return m.IMarketServiceProvider.ResponseMarketEvent(ctx, resp) +} + +func (m MarketNodeImpl) ListenMarketEvent(ctx context.Context, policy *gatewayTypes.MarketRegisterPolicy) (<-chan *gatewayTypes.RequestEvent, error) { + return m.IMarketServiceProvider.ListenMarketEvent(ctx, policy) +} + func (m MarketNodeImpl) ActorList(ctx context.Context) ([]types.User, error) { return m.UserMgr.ActorList(ctx) } @@ -478,7 +489,7 @@ func (m *MarketNodeImpl) MarketDataTransferPath(ctx context.Context, mAddr addre return m.TransferPathFunc(mAddr) } -func (m *MarketNodeImpl) MarketDataSetTransferPath(ctx context.Context, mAddr address.Address, path string) error { +func (m *MarketNodeImpl) MarketSetDataTransferPath(ctx context.Context, mAddr address.Address, path string) error { return m.SetTransferPathFunc(mAddr, path) } diff --git a/cmd/market-client/main.go b/cmd/market-client/main.go index ba3b6290..c95ad7b3 100644 --- a/cmd/market-client/main.go +++ b/cmd/market-client/main.go @@ -274,7 +274,7 @@ func marketClient(cctx *cli.Context) error { config.ConfigClientOpts(cfg), - clients2.ClientsOpts(false, "", &cfg.Messager, &cfg.Signer), + clients2.ClientsOpts(false, &cfg.Messager, &cfg.Signer), models.DBOptions(false, nil), network.NetworkOpts(false, cfg.SimultaneousTransfersForStorage, 0, cfg.SimultaneousTransfersForRetrieval), paychmgr.PaychOpts, diff --git a/cmd/venus-market/main.go b/cmd/venus-market/main.go index dc4e1393..ecf7e48f 100644 --- a/cmd/venus-market/main.go +++ b/cmd/venus-market/main.go @@ -36,34 +36,31 @@ var ( Name: "node-url", Usage: "url to connect to daemon service", } - NodeTokenFlag = &cli.StringFlag{ - Name: "node-token", - Usage: "node token", - } AuthUrlFlag = &cli.StringFlag{ Name: "auth-url", Usage: "url to connect to auth service", } - AuthTokeFlag = &cli.StringFlag{ - Name: "auth-token", - Usage: "token for connect venus components", - } MessagerUrlFlag = &cli.StringFlag{ Name: "messager-url", Usage: "url to connect messager service", } - MessagerTokenFlag = &cli.StringFlag{ - Name: "messager-token", - Usage: "messager token", - Hidden: true, + + GatewayUrlFlag = &cli.StringFlag{ + Name: "gateway-url", + Usage: "used to connect gateway service for sign", + } + + ChainServiceTokenFlag = &cli.StringFlag{ + Name: "cs-token", + Usage: "chain service token", } SignerTypeFlag = &cli.StringFlag{ - Name: "signer-type", - Usage: "signer service type(lotusnode, wallet, gateway)", - Hidden: false, + Name: "signer-type", + Usage: "signer service type(lotusnode, wallet, gateway)", + Value: "wallet", } SignerUrlFlag = &cli.StringFlag{ Name: "signer-url", @@ -74,24 +71,10 @@ var ( Usage: "auth token for connect signer service", } - GatewayUrlFlag = &cli.StringFlag{ - Name: "gateway-url", - Usage: "used to connect gateway service for sign", - } - GatewayTokenFlag = &cli.StringFlag{ - Name: "gateway-token", - Usage: "used to connect gateway service for sign", - } - MysqlDsnFlag = &cli.StringFlag{ Name: "mysql-dsn", Usage: "mysql connection string", } - - RetrievalPaymentAddress = &cli.StringFlag{ - Name: "payment-addr", - Usage: "payment address for retrieval, eg. f01000", - } ) func main() { @@ -104,8 +87,7 @@ func main() { RepoFlag, }, Commands: []*cli.Command{ - soloRunCmd, - poolRunCmd, + runCmd, cli2.PiecesCmd, cli2.RetrievalDealsCmd, cli2.StorageDealsCmd, diff --git a/cmd/venus-market/pool-run.go b/cmd/venus-market/run.go similarity index 58% rename from cmd/venus-market/pool-run.go rename to cmd/venus-market/run.go index ed7deff8..c7690937 100644 --- a/cmd/venus-market/pool-run.go +++ b/cmd/venus-market/run.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "os" "github.com/gorilla/mux" "github.com/urfave/cli/v2" @@ -15,6 +16,7 @@ import ( "github.com/filecoin-project/venus-market/v2/api/clients" "github.com/filecoin-project/venus-market/v2/api/impl" cli2 "github.com/filecoin-project/venus-market/v2/cli" + "github.com/filecoin-project/venus-market/v2/cmd" "github.com/filecoin-project/venus-market/v2/config" "github.com/filecoin-project/venus-market/v2/dagstore" "github.com/filecoin-project/venus-market/v2/fundmgr" @@ -34,32 +36,126 @@ import ( "github.com/filecoin-project/venus/venus-shared/api/permission" ) -var poolRunCmd = &cli.Command{ - Name: "pool-run", - Usage: "Run the market daemon in pool mode", +var runCmd = &cli.Command{ + Name: "run", + Usage: "Run the market daemon", Flags: []cli.Flag{ NodeUrlFlag, - NodeTokenFlag, AuthUrlFlag, - AuthTokeFlag, MessagerUrlFlag, - MessagerTokenFlag, GatewayUrlFlag, - GatewayTokenFlag, + ChainServiceTokenFlag, SignerTypeFlag, SignerUrlFlag, SignerTokenFlag, MysqlDsnFlag, - RetrievalPaymentAddress, }, - Action: poolDaemon, + Action: runDaemon, } -func poolDaemon(cctx *cli.Context) error { +func flagData(cctx *cli.Context, cfg *config.MarketConfig) error { + if cctx.IsSet(NodeUrlFlag.Name) { + cfg.Node.Url = cctx.String(NodeUrlFlag.Name) + } + + if cctx.IsSet(AuthUrlFlag.Name) { + cfg.AuthNode.Url = cctx.String(AuthUrlFlag.Name) + } + + if cctx.IsSet(MessagerUrlFlag.Name) { + cfg.Messager.Url = cctx.String(MessagerUrlFlag.Name) + } + + // chain service token + if cctx.IsSet(ChainServiceTokenFlag.Name) { + csToken := cctx.String(ChainServiceTokenFlag.Name) + cfg.Node.Token = csToken + cfg.Messager.Token = csToken + cfg.AuthNode.Token = csToken + } + + signerType := cctx.String(SignerTypeFlag.Name) + switch signerType { + case config.SignerTypeGateway: + if cctx.IsSet(GatewayUrlFlag.Name) { + cfg.Signer.Url = cctx.String(GatewayUrlFlag.Name) + } + + if cctx.IsSet(ChainServiceTokenFlag.Name) { + cfg.Signer.Token = cctx.String(ChainServiceTokenFlag.Name) + } + case config.SignerTypeWallet: + if cctx.IsSet(SignerUrlFlag.Name) { + cfg.Signer.Url = cctx.String(SignerUrlFlag.Name) + } + + if cctx.IsSet(SignerTokenFlag.Name) { + cfg.Signer.Token = cctx.String(SignerTokenFlag.Name) + } + case config.SignerTypeLotusnode: + if cctx.IsSet(NodeUrlFlag.Name) { + cfg.Signer.Url = cctx.String(NodeUrlFlag.Name) + } + + if cctx.IsSet(ChainServiceTokenFlag.Name) { + cfg.Signer.Token = cctx.String(ChainServiceTokenFlag.Name) + } + default: + return fmt.Errorf("unsupport signer type %s", signerType) + } + cfg.Signer.SignerType = signerType + + if cctx.IsSet(MysqlDsnFlag.Name) { + cfg.Mysql.ConnectionString = cctx.String(MysqlDsnFlag.Name) + } + + return nil +} + +func prepare(cctx *cli.Context) (*config.MarketConfig, error) { + cfg := config.DefaultMarketConfig + cfg.HomeDir = cctx.String(RepoFlag.Name) + cfgPath, err := cfg.ConfigPath() + if err != nil { + return nil, err + } + + mainLog.Info("load config from path ", cfgPath) + if _, err := os.Stat(cfgPath); os.IsNotExist(err) { + //create + err = flagData(cctx, cfg) + if err != nil { + return nil, fmt.Errorf("parser data from flag: %w", err) + } + + err = config.SaveConfig(cfg) + if err != nil { + return nil, fmt.Errorf("save config to %s: %w", cfgPath, err) + } + } else if err == nil { + //loadConfig + err = config.LoadConfig(cfgPath, cfg) + if err != nil { + return nil, err + } + + err = flagData(cctx, cfg) + if err != nil { + return nil, fmt.Errorf("parser data from flag: %w", err) + } + } else { + return nil, err + } + + return cfg, cmd.FetchAndLoadBundles(cctx.Context, cfg.Node) +} + +func runDaemon(cctx *cli.Context) error { utils.SetupLogLevels() - cfg, err := prepare(cctx, config.SignerTypeGateway) + + cfg, err := prepare(cctx) if err != nil { - return fmt.Errorf("prepare pool run failed:%w", err) + return fmt.Errorf("prepare run failed: %w", err) } // Configuration sanity check @@ -98,7 +194,7 @@ func poolDaemon(cctx *cli.Context) error { minermgr.MinerMgrOpts(), // clients - clients.ClientsOpts(true, "pool", &cfg.Messager, &cfg.Signer), + clients.ClientsOpts(true, &cfg.Messager, &cfg.Signer), models.DBOptions(true, &cfg.Mysql), network.NetworkOpts(true, cfg.SimultaneousTransfersForRetrieval, cfg.SimultaneousTransfersForStoragePerClient, cfg.SimultaneousTransfersForStorage), piecestorage.PieceStorageOpts(&cfg.PieceStorage), diff --git a/cmd/venus-market/solo-run.go b/cmd/venus-market/solo-run.go deleted file mode 100644 index a7909e84..00000000 --- a/cmd/venus-market/solo-run.go +++ /dev/null @@ -1,130 +0,0 @@ -package main - -import ( - "fmt" - - "go.uber.org/fx" - - "github.com/gorilla/mux" - "github.com/urfave/cli/v2" - - "github.com/ipfs-force-community/venus-common-utils/builder" - "github.com/ipfs-force-community/venus-common-utils/journal" - - "github.com/filecoin-project/venus-market/v2/api/clients" - "github.com/filecoin-project/venus-market/v2/api/impl" - cli2 "github.com/filecoin-project/venus-market/v2/cli" - "github.com/filecoin-project/venus-market/v2/config" - "github.com/filecoin-project/venus-market/v2/dagstore" - "github.com/filecoin-project/venus-market/v2/fundmgr" - "github.com/filecoin-project/venus-market/v2/metrics" - "github.com/filecoin-project/venus-market/v2/minermgr" - "github.com/filecoin-project/venus-market/v2/models" - "github.com/filecoin-project/venus-market/v2/network" - "github.com/filecoin-project/venus-market/v2/paychmgr" - "github.com/filecoin-project/venus-market/v2/piecestorage" - "github.com/filecoin-project/venus-market/v2/retrievalprovider" - "github.com/filecoin-project/venus-market/v2/rpc" - "github.com/filecoin-project/venus-market/v2/storageprovider" - types2 "github.com/filecoin-project/venus-market/v2/types" - "github.com/filecoin-project/venus-market/v2/utils" - - "github.com/filecoin-project/venus-auth/jwtclient" - - marketapi "github.com/filecoin-project/venus/venus-shared/api/market" - "github.com/filecoin-project/venus/venus-shared/api/permission" -) - -var soloRunCmd = &cli.Command{ - Name: "solo-run", - Usage: "Run the market daemon in solo mode", - Flags: []cli.Flag{ - NodeUrlFlag, - NodeTokenFlag, - SignerTypeFlag, - SignerUrlFlag, - SignerTokenFlag, - MysqlDsnFlag, - RetrievalPaymentAddress, - }, - Action: soloDaemon, -} - -func soloDaemon(cctx *cli.Context) error { - utils.SetupLogLevels() - - cfg, err := prepare(cctx, config.SignerTypeWallet) - if err != nil { - return fmt.Errorf("prepare solo run failed: %w", err) - } - - // Configuration sanity check - if len(cfg.AuthNode.Url) > 0 { - return fmt.Errorf("solo mode does not need to configure auth node") - } - - if len(cfg.Signer.Url) == 0 { - return fmt.Errorf("the signer node must be configured") - } - - ctx := cctx.Context - - resAPI := &impl.MarketNodeImpl{} - shutdownChan := make(chan struct{}) - closeFunc, err := builder.New(ctx, - builder.Override(new(journal.DisabledEvents), journal.EnvDisabledEvents), - builder.Override(new(journal.Journal), func(lc fx.Lifecycle, home config.IHome, disabled journal.DisabledEvents) (journal.Journal, error) { - return journal.OpenFilesystemJournal(lc, home.MustHomePath(), "venus-market", disabled) - }), - - metrics.MetricsOpts("venus-market", &cfg.Metrics), - builder.Override(new(types2.ShutdownChan), shutdownChan), - - // override marketconfig - builder.Override(new(config.MarketConfig), cfg), - - // config - config.ConfigServerOpts(cfg), - - // 'solo' mode doesn't needs a 'AuthClient' of venus-auth, - // provide a nil 'AuthClient', just for making 'NewUserMgrImpl' happy - builder.Override(new(*jwtclient.AuthClient), func() *jwtclient.AuthClient { return nil }), - - // user manager - minermgr.MinerMgrOpts(), - - // clients - clients.ClientsOpts(true, "solo", &cfg.Messager, &cfg.Signer), - models.DBOptions(true, &cfg.Mysql), - network.NetworkOpts(true, cfg.SimultaneousTransfersForRetrieval, cfg.SimultaneousTransfersForStoragePerClient, cfg.SimultaneousTransfersForStorage), - piecestorage.PieceStorageOpts(&cfg.PieceStorage), - fundmgr.FundMgrOpts, - dagstore.DagstoreOpts, - paychmgr.PaychOpts, - // Markets - storageprovider.StorageProviderOpts(cfg), - retrievalprovider.RetrievalProviderOpts(cfg), - - func(s *builder.Settings) error { - s.Invokes[ExtractApiKey] = builder.InvokeOption{ - Priority: 10, - Option: fx.Populate(resAPI), - } - return nil - }, - ) - if err != nil { - return fmt.Errorf("initializing node: %w", err) - } - defer closeFunc(ctx) //nolint - - finishCh := utils.MonitorShutdown(shutdownChan) - - router := mux.NewRouter() - router.Handle("resource", rpc.NewPieceStorageServer(resAPI.PieceStorageMgr)) - - var fullAPI marketapi.IMarketStruct - permission.PermissionProxy(marketapi.IMarket(resAPI), &fullAPI) - - return rpc.ServeRPC(ctx, cfg, &cfg.API, router, 1000, cli2.API_NAMESPACE_VENUS_MARKET, nil, &fullAPI, finishCh) -} diff --git a/cmd/venus-market/utils.go b/cmd/venus-market/utils.go deleted file mode 100644 index a5cbfcee..00000000 --- a/cmd/venus-market/utils.go +++ /dev/null @@ -1,142 +0,0 @@ -package main - -import ( - "fmt" - "os" - - "github.com/urfave/cli/v2" - - "github.com/filecoin-project/go-address" - - "github.com/filecoin-project/venus-market/v2/cmd" - "github.com/filecoin-project/venus-market/v2/config" -) - -func flagData(cctx *cli.Context, cfg *config.MarketConfig) error { - if cctx.IsSet(NodeUrlFlag.Name) { - cfg.Node.Url = cctx.String(NodeUrlFlag.Name) - } - - if cctx.IsSet(MessagerUrlFlag.Name) { - cfg.Messager.Url = cctx.String(MessagerUrlFlag.Name) - } - - if cctx.IsSet(AuthUrlFlag.Name) { - cfg.AuthNode.Url = cctx.String(AuthUrlFlag.Name) - } - - signerType := cctx.String(SignerTypeFlag.Name) - switch signerType { - case config.SignerTypeGateway: - { - if cctx.IsSet(GatewayUrlFlag.Name) { - cfg.Signer.Url = cctx.String(GatewayUrlFlag.Name) - } - if cctx.IsSet(GatewayTokenFlag.Name) { - cfg.Signer.Token = cctx.String(GatewayTokenFlag.Name) - } - } - case config.SignerTypeWallet: - { - if cctx.IsSet(SignerUrlFlag.Name) { - cfg.Signer.Url = cctx.String(SignerUrlFlag.Name) - } - if cctx.IsSet(SignerTokenFlag.Name) { - cfg.Signer.Token = cctx.String(SignerTokenFlag.Name) - } - } - case config.SignerTypeLotusnode: - { - if cctx.IsSet(NodeUrlFlag.Name) { - cfg.Signer.Url = cctx.String(NodeUrlFlag.Name) - } - if cctx.IsSet(NodeTokenFlag.Name) { - cfg.Signer.Token = cctx.String(NodeTokenFlag.Name) - } - } - default: - return fmt.Errorf("unsupport signer type %s", signerType) - } - cfg.Signer.SignerType = signerType - - if cctx.IsSet(AuthTokeFlag.Name) { - cfg.Node.Token = cctx.String(AuthTokeFlag.Name) - - if len(cfg.AuthNode.Url) > 0 { - cfg.AuthNode.Token = cctx.String(AuthTokeFlag.Name) - } - - if len(cfg.Messager.Url) > 0 { - cfg.Messager.Token = cctx.String(AuthTokeFlag.Name) - } - - if len(cfg.Signer.Url) > 0 { - cfg.Signer.Token = cctx.String(AuthTokeFlag.Name) - } - } - - if cctx.IsSet(NodeTokenFlag.Name) { - cfg.Node.Token = cctx.String(NodeTokenFlag.Name) - } - if cctx.IsSet(MessagerTokenFlag.Name) { - cfg.Messager.Token = cctx.String(MessagerTokenFlag.Name) - } - - if cctx.IsSet(MysqlDsnFlag.Name) { - cfg.Mysql.ConnectionString = cctx.String(MysqlDsnFlag.Name) - } - - if cctx.IsSet(RetrievalPaymentAddress.Name) { - addrStr := cctx.String(RetrievalPaymentAddress.Name) - addr, err := address.NewFromString(addrStr) - if err != nil { - return fmt.Errorf("flag provide a wrong address %s %w", addrStr, err) - } - - cfg.RetrievalPaymentAddress = config.Address(addr) - } - return nil -} - -func prepare(cctx *cli.Context, defSignerType config.SignerType) (*config.MarketConfig, error) { - if !cctx.IsSet(SignerTypeFlag.Name) { - if err := cctx.Set(SignerTypeFlag.Name, defSignerType); err != nil { - return nil, fmt.Errorf("set `%s` with wallet failed %w", SignerTypeFlag.Name, err) - } - } - cfg := config.DefaultMarketConfig - cfg.HomeDir = cctx.String(RepoFlag.Name) - cfgPath, err := cfg.ConfigPath() - if err != nil { - return nil, err - } - - mainLog.Info("load config from path ", cfgPath) - if _, err := os.Stat(cfgPath); os.IsNotExist(err) { - //create - err = flagData(cctx, cfg) - if err != nil { - return nil, fmt.Errorf("parser data from flag %w", err) - } - - err = config.SaveConfig(cfg) - if err != nil { - return nil, fmt.Errorf("save config to %s %w", cfgPath, err) - } - } else if err == nil { - //loadConfig - err = config.LoadConfig(cfgPath, cfg) - if err != nil { - return nil, err - } - - err = flagData(cctx, cfg) - if err != nil { - return nil, fmt.Errorf("parser data from flag %w", err) - } - } else { - return nil, err - } - - return cfg, cmd.FetchAndLoadBundles(cctx.Context, cfg.Node) -} diff --git a/config/common.go b/config/common.go index 0510b8ac..745365d1 100644 --- a/config/common.go +++ b/config/common.go @@ -37,7 +37,7 @@ type Common struct { Libp2p Libp2p } -// chain-service connect config +// ConnectConfig chain-service connect config type ConnectConfig struct { Url string Token string @@ -46,11 +46,9 @@ type ConnectConfig struct { type ( Node ConnectConfig Messager ConnectConfig - Market ConnectConfig AuthNode ConnectConfig ) -// signer config type SignerType = string const ( @@ -60,12 +58,12 @@ const ( ) type Signer struct { - SignerType SignerType `toml:"Type"` // wallet/gateway + SignerType SignerType `toml:"Type"` Url string Token string } -// common config for provider +// ProviderConfig is common config for provider type ProviderConfig struct { // When enabled, the miner can accept online deals ConsiderOnlineStorageDeals bool diff --git a/config/def_config.go b/config/def_config.go index fe087248..b7c81d35 100644 --- a/config/def_config.go +++ b/config/def_config.go @@ -31,20 +31,21 @@ var DefaultMarketConfig = &MarketConfig{ }, }, Node: Node{ - Url: "", // "/ip4//tcp/3453", - Token: "", + Url: "/ip4//tcp/3453", + Token: "", }, Messager: Messager{ - Url: "", // /ip4//tcp/39812 - Token: "", - }, - Signer: Signer{ - Url: "", // /ip4//tcp/5678 - Token: "", + Url: "/ip4//tcp/39812", + Token: "", }, AuthNode: AuthNode{ - Url: "", // "http://:8989", - Token: "", + Url: "http://:8989", + Token: "", + }, + Signer: Signer{ + SignerType: "wallet", + Url: "/ip4//tcp/5678", + Token: "", }, Mysql: Mysql{ ConnectionString: "", @@ -73,7 +74,7 @@ var DefaultMarketConfig = &MarketConfig{ } var DefaultMarketClientConfig = &MarketClientConfig{ - Home: Home{"~/.venusclient"}, + Home: Home{"~/.marketclient"}, Common: Common{ API: API{ ListenAddress: "/ip4/127.0.0.1/tcp/41231/ws", @@ -89,16 +90,16 @@ var DefaultMarketClientConfig = &MarketClientConfig{ }, }, Node: Node{ - Url: "", // "/ip4//tcp/3453", - Token: "", + Url: "/ip4//tcp/3453", + Token: "", }, Signer: Signer{ - Url: "", // "/ip4//tcp/5678", - Token: "", + Url: "/ip4//tcp/5678", + Token: "", }, Messager: Messager{ - Url: "", // "/ip4//tcp/39812", - Token: "", + Url: "/ip4//tcp/39812", + Token: "", }, DefaultMarketAddress: Address(address.Undef), SimultaneousTransfersForStorage: DefaultSimultaneousTransfers, diff --git a/dagstore/market_api.go b/dagstore/market_api.go index b7c3801d..135c666f 100644 --- a/dagstore/market_api.go +++ b/dagstore/market_api.go @@ -5,7 +5,6 @@ import ( "fmt" "io" - marketMetrics "github.com/filecoin-project/venus-market/v2/metrics" "github.com/ipfs-force-community/metrics" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -15,6 +14,9 @@ import ( "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/go-padreader" + gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" + + marketMetrics "github.com/filecoin-project/venus-market/v2/metrics" "github.com/filecoin-project/venus-market/v2/models/repo" "github.com/filecoin-project/venus-market/v2/piecestorage" "github.com/filecoin-project/venus-market/v2/utils" @@ -28,20 +30,22 @@ type MarketAPI interface { } type marketAPI struct { - pieceStorageMgr *piecestorage.PieceStorageManager - pieceRepo repo.StorageDealRepo - useTransient bool - metricsCtx metrics.MetricsCtx + pieceStorageMgr *piecestorage.PieceStorageManager + pieceRepo repo.StorageDealRepo + useTransient bool + metricsCtx metrics.MetricsCtx + gatewayMarketClient gatewayAPIV2.IMarketClient } var _ MarketAPI = (*marketAPI)(nil) -func NewMarketAPI(ctx metrics.MetricsCtx, repo repo.Repo, pieceStorageMgr *piecestorage.PieceStorageManager, useTransient bool) MarketAPI { +func NewMarketAPI(ctx metrics.MetricsCtx, repo repo.Repo, pieceStorageMgr *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient, useTransient bool) MarketAPI { return &marketAPI{ - pieceRepo: repo.StorageDealRepo(), - pieceStorageMgr: pieceStorageMgr, - useTransient: useTransient, - metricsCtx: ctx, + pieceRepo: repo.StorageDealRepo(), + pieceStorageMgr: pieceStorageMgr, + useTransient: useTransient, + metricsCtx: ctx, + gatewayMarketClient: gatewayMarketClient, } } @@ -56,6 +60,7 @@ func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, err } return true, nil // todo check isunseal from miner + // m.gatewayMarketClient.IsUnsealed() } func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) { @@ -111,7 +116,6 @@ func (m *marketAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (u len := pieceInfo.Deals[0].Length - // todo is this size need to convert to unpad size return uint64(len), nil } diff --git a/dagstore/market_api_test.go b/dagstore/market_api_test.go index fac99125..a596b204 100644 --- a/dagstore/market_api_test.go +++ b/dagstore/market_api_test.go @@ -53,7 +53,8 @@ func TestMarket(t *testing.T) { _, err = memPieceStorage.SaveTo(ctx, testResourceId.String(), payloadWriter) assert.Nil(t, err) - marketAPI := NewMarketAPI(ctx, r, pmgr, false) + // todo: mock IMarketEvent + marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false) size, err := marketAPI.GetUnpaddedCARSize(ctx, testResourceId) assert.Nil(t, err) diff --git a/dagstore/modules.go b/dagstore/modules.go index 3c31baba..ecec639a 100644 --- a/dagstore/modules.go +++ b/dagstore/modules.go @@ -15,6 +15,8 @@ import ( "github.com/ipfs-force-community/metrics" "github.com/ipfs-force-community/venus-common-utils/builder" + gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" + "github.com/filecoin-project/venus-market/v2/config" "github.com/filecoin-project/venus-market/v2/models/repo" "github.com/filecoin-project/venus-market/v2/piecestorage" @@ -27,9 +29,9 @@ const ( DefaultDAGStoreDir = "dagstore" ) -// NewMarketAPI creates a new MarketAPI adaptor for the dagstore mounts. -func CreateAndStartMarketAPI(ctx metrics.MetricsCtx, lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage *piecestorage.PieceStorageManager) (MarketAPI, error) { - mountApi := NewMarketAPI(ctx, repo, pieceStorage, r.UseTransient) +// CreateAndStartMarketAPI creates a new MarketAPI adaptor for the dagstore mounts. +func CreateAndStartMarketAPI(ctx metrics.MetricsCtx, lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient) (MarketAPI, error) { + mountApi := NewMarketAPI(ctx, repo, pieceStorage, gatewayMarketClient, r.UseTransient) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { return mountApi.Start(ctx) diff --git a/docker/script/compose.sh b/docker/script/compose.sh index 31a612d3..7fe6e55d 100755 --- a/docker/script/compose.sh +++ b/docker/script/compose.sh @@ -20,9 +20,10 @@ fi echo "Compose Int End!" -/app/venus-market pool-run \ +/app/venus-market run \ --node-url=/ip4/127.0.0.1/tcp/3453 \ --auth-url=http://127.0.0.1:8989 \ --gateway-url=/ip4/127.0.0.1/tcp/45132/ \ --messager-url=/ip4/127.0.0.1/tcp/39812/ \ ---auth-token=${token} +--cs-token=${token} \ +--signer-type="gateway" diff --git "a/docs/zh/\345\277\253\351\200\237\345\220\257\347\224\250.md" "b/docs/zh/\345\277\253\351\200\237\345\220\257\347\224\250.md" index 1093813b..16d38e25 100644 --- "a/docs/zh/\345\277\253\351\200\237\345\220\257\347\224\250.md" +++ "b/docs/zh/\345\277\253\351\200\237\345\220\257\347\224\250.md" @@ -15,13 +15,12 @@ 存储流程: -| 阶段 | 步骤 | 说明 | -| ---- | ---- | ---- | -| 启动 `venus-market` 和 `market-client` | 1. 配置和启动[venus-market](#配置启动venus-market)
2. 配置和启动[market-client](#配置启动market-client) | | -| 代理 `miners` 的 `libp2p` 监听服务 | 通过 `venus-market` [代理 `miners` 的 `libp2p` 监听服务](#venus-market代理libp2p监听) | | -| 为 `miners` 挂单 | 通过 `venus-market` [挂单](#挂单) | | -| 指定 `miner` 发单 | 1. 通过 `market-client` [导入待存储的数据](#导入待存储的数据)
2. 根据需求[选择合适的挂单](#选择挂单)
3. [发起存储订单](#发起存储订单) | 发单之后,需要一定时间执行订单交易流程,订单被确认后,存储提供商对订单中数据进行封装并提交证明,`venus-market` 负责跟踪订单状态。 - +| 阶段 | 步骤 | 说明 | +|-------------------------------------|----------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------| +| 启动 `venus-market` 和 `market-client` | 1. 配置和启动[venus-market](#配置启动venus-market)
2. 配置和启动[market-client](#配置启动market-client) | | +| 代理 `miners` 的 `libp2p` 监听服务 | 通过 `venus-market` [代理 `miners` 的 `libp2p` 监听服务](#venus-market代理libp2p监听) | | +| 为 `miners` 挂单 | 通过 `venus-market` [挂单](#挂单) | | +| 指定 `miner` 发单 | 1. 通过 `market-client` [导入待存储的数据](#导入待存储的数据)
2. 根据需求[选择合适的挂单](#选择挂单)
3. [发起存储订单](#发起存储订单) | 发单之后,需要一定时间执行订单交易流程,订单被确认后,存储提供商对订单中数据进行封装并提交证明,`venus-market` 负责跟踪订单状态。 | 检索流程: @@ -37,36 +36,48 @@ ### 初始化 -- `pool` 模式 +- 链上模式 -`pool` 模式作为链服务中的一个组件,配合`venus-auth`、`venus`、 `venus-messager`、`venus-gateway`等组件一起为注册到链服务的 `miner` 提供市场服务。**在`venus`体系中建议使用此方式部署.** +作为链服务中的一个组件,配合`venus-auth`、`venus`、 `venus-messager`、`venus-gateway`等组件一起为注册到链服务的 `miner` 提供市场服务。 ``` -./venus-market pool-run \ +./venus-market run \ --node-url=/ip4//tcp/ \ --auth-url=http://: \ --gateway-url=/ip4//tcp/ \ --messager-url=/ip4//tcp/ \ ---auth-token= +--cs-token= \ +--signer-type="gateway" ``` :tipping_hand_woman: **`shared-token` 是用于访问其他链服务组件 `API` 时的权限验证,由 `venus-auth` 管理,需要 `admin` 权限。具体请参考 [venus-auth token](https://github.com/filecoin-project/venus-docs/blob/master/docs/zh/cs/deploy-a-cs.md#user及token生成)** -- `solo` 模式 +- 链下模式 -`solo` 模式独立于链服务外部署,为一个单独的 `miner` 提供市场服务, 需要连接同步节点(`venus/lotus`)和 签名节点(`venus-wallet/lotus-wallet`)方能正常运行。签名节点也可以是同步节点,如`venus fullnode`和`lotus fullnode`导入钱包私钥后可以作为签名节点。 这里介绍配合 `venus fullnode` 的启动方式: +配合 `lotus fullnode` 的启动方式: ``` -./venus-market solo-run \ +./venus-market run \ --node-url=/ip4//tcp/ \ ---node-token= \ ---signer-type=wallet \ +--messager-url=/ip4//tcp/ \ +--cs-token= \ +--signer-type="lotusnode" +``` + +使用链服务和 `venus-wallet` 的启动方式: + +``` +./venus-market run \ +--auth-url=http://: \ +--node-url=/ip4//tcp/ \ +--messager-url=/ip4//tcp/ \ +--cs-token= \ --signer-url=/ip4//tcp/ \ ---signer-token= \ ---miner=: +--signer-token= \ +--signer-type="wallet" ``` -:tipping_hand_woman: **在`venus fullnode`导入钱包私钥时,`signer-url`和`signer-token` 配置和 `node-url`和`node-token`一致,如果签名用独立的`venus-wallet`组件,则配置为 `venus-wallet`的监听地址及具有签名权限的 `token`.** +:tipping_hand_woman: **签名用独立的`venus-wallet`组件,则配置为 `venus-wallet`的监听地址及具有签名权限的 `token`.** `venus-wallet` 生成具有签名权限的 `token`: @@ -86,13 +97,13 @@ $ ./venus-wallet auth api-info --perm=sign *tips:* 修改完配置文件之后需要重启`venus-market`服务: ```bash -$ nohup ./venus-market pool-run > market.log 2>&1 & +$ nohup ./venus-market run > market.log 2>&1 & ``` > 已经生成 `repo` 之后,初始化所需的参数被写入配置文件,故后续启动无需再加。 #### 链服务配置 -- `pool` 模式,包括:同步节点,消息节点,签名节点及授权节点。 +- 包括:同步节点,消息节点,签名节点及授权节点。 ```yuml [Node] @@ -110,27 +121,6 @@ $ nohup ./venus-market pool-run > market.log 2>&1 & Token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1lIjoiemwiLCJwZXJtIjoiYWRtaW4iLCJleHQiOiIifQ.3u-PInSUmX-8f6Z971M7JBCHYgFVQrvwUjJfFY03ouQ" ``` -- `solo` 模式: 通常用`fullnode` 担任链信息同步及消息推送,故只需配置 `[Node]` 和 `[Signer]` - -```yuml -[Node] - Url = "/ip4/127.0.0.1/tcp/3453" - Token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1lIjoiZGVmYXVsdExvY2FsVG9rZW4iLCJwZXJtIjoiYWRtaW4iLCJleHQiOiIifQ.bdnxynriFupkiQ4-6BzRMOsFE08L9zuEZEVB_aaCxLE" - -[Messager] - Url = "" - Token = "" - -[Signer] - Type = "wallet" - Url = "/ip4/127.0.0.1/tcp/3453/http" - Token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1lIjoiZGVmYXVsdExvY2FsVG9rZW4iLCJwZXJtIjoiYWRtaW4iLCJleHQiOiIifQ.bdnxynriFupkiQ4-6BzRMOsFE08L9zuEZEVB_aaCxLE" - -[AuthNode] - Url = "" - Token = "" -``` - #### `API` 监听配置 `venus-market` 默认监听端口为 `127.0.0.1:41235`, 为了支持不同网络的访问请求, 需要修改`API`的监听地址: @@ -181,13 +171,40 @@ PublishMsgPeriod = "10s" #### `StorageMiners` 配置 -`pool` 模式不需要配置,由 `venus-auth` 管理,`solo` 模式需要配置,如下所示: - -```yuml -[[StorageMiners]] - Addr = "t0128788" - Account = "account" -``` +`venus-market` 服务的矿工及每个矿工的参数,配置如下: + +``` +[[Miners]] + Addr = "f01000" + Account = "testuser01" + + ConsiderOnlineStorageDeals = true + ConsiderOfflineStorageDeals = true + ConsiderOnlineRetrievalDeals = true + ConsiderOfflineRetrievalDeals = true + ConsiderVerifiedStorageDeals = true + ConsiderUnverifiedStorageDeals = true + PieceCidBlocklist = [] + ExpectedSealDuration = "24h0m0s" + MaxDealStartDelay = "336h0m0s" + PublishMsgPeriod = "1h0m0s" + MaxDealsPerPublishMsg = 8 + MaxProviderCollateralMultiplier = 2 + Filter = "" + RetrievalFilter = "" + TransferPath = "" + MaxPublishDealsFee = "0 FIL" + MaxMarketBalanceAddFee = "0 FIL" + [CommonProviderConfig.RetrievalPricing] + Strategy = "default" + [CommonProviderConfig.RetrievalPricing.Default] + VerifiedDealsFreeTransfer = true + [CommonProviderConfig.RetrievalPricing.External] + Path = "" + [CommonProviderConfig.AddressConfig] + DisableWorkerFallback = false +``` +如果有多个矿工,将上述配置拷贝一份即可。***如果矿工比较多,那配置文件会很长,考虑优化?*** ## venus-market代理libp2p监听 diff --git a/go.mod b/go.mod index 76d3c8d7..83234477 100644 --- a/go.mod +++ b/go.mod @@ -31,9 +31,9 @@ require ( github.com/filecoin-project/specs-actors/v2 v2.3.6 github.com/filecoin-project/specs-actors/v7 v7.0.1 github.com/filecoin-project/specs-storage v0.4.1 - github.com/filecoin-project/venus v1.7.1-0.20221116064917-aab91a46434e - github.com/filecoin-project/venus-auth v1.8.0-rc1.0.20221101071442-c2ed29a74e9e - github.com/filecoin-project/venus-messager v1.8.0-rc2.0.20221103053206-08c153719d87 + github.com/filecoin-project/venus v1.8.1-0.20221129064655-35d20069eca9 + github.com/filecoin-project/venus-auth v1.8.1-0.20221124050656-73734ff50be5 + github.com/filecoin-project/venus-messager v1.8.1-0.20221124093938-eee70cda6d54 github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 @@ -41,7 +41,7 @@ require ( github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 - github.com/ipfs-force-community/venus-gateway v1.8.0-rc1.0.20221102034912-cf73a722a26a + github.com/ipfs-force-community/venus-gateway v1.8.1-0.20221124093814-a16028a23ace github.com/ipfs/go-block-format v0.0.3 github.com/ipfs/go-blockservice v0.4.0 github.com/ipfs/go-cid v0.2.0 diff --git a/go.sum b/go.sum index 249e88d8..e170f0ec 100644 --- a/go.sum +++ b/go.sum @@ -422,7 +422,6 @@ github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/ github.com/filecoin-project/go-state-types v0.1.8/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.1.12/go.mod h1:n/kujdC9JphvYTrmaD1+vJpvDPy/DwzckoMzP0nBKWI= -github.com/filecoin-project/go-state-types v0.9.7/go.mod h1:+HCZifUV+e8TlQkgll22Ucuiq8OrVJkK+4Kh4u75iiw= github.com/filecoin-project/go-state-types v0.9.8 h1:xkdITiR7h691z1tWOhNCJxHI+cq+Mq7ATkpHQ7f1gu8= github.com/filecoin-project/go-state-types v0.9.8/go.mod h1:+HCZifUV+e8TlQkgll22Ucuiq8OrVJkK+4Kh4u75iiw= github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= @@ -475,15 +474,14 @@ github.com/filecoin-project/storetheindex v0.4.17 h1:w0dVc954TGPukoVbidlYvn9Xt+w github.com/filecoin-project/storetheindex v0.4.17/go.mod h1:y2dL8C5D3PXi183hdxgGtM8vVYOZ1lg515tpl/D3tN8= github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E= github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0= -github.com/filecoin-project/venus v1.7.1-0.20221102021930-ac0f60ee4c31/go.mod h1:+QxMl7tEDvNLPt2ZnsGQOKWdmNoWF4ZxrHP+Z4phv/0= -github.com/filecoin-project/venus v1.7.1-0.20221116064917-aab91a46434e h1:bHSvy9208RQc1d3/LsRjZd191slW3PEI55L/bsetPdQ= -github.com/filecoin-project/venus v1.7.1-0.20221116064917-aab91a46434e/go.mod h1:+QxMl7tEDvNLPt2ZnsGQOKWdmNoWF4ZxrHP+Z4phv/0= +github.com/filecoin-project/venus v1.8.1-0.20221124060352-6507f0f062aa/go.mod h1:AnTMHZERLQH9oOSGohEVJAwDfQAHcwLXQuAjjmHIvh4= +github.com/filecoin-project/venus v1.8.1-0.20221129064655-35d20069eca9 h1:GfpGSkdyaBf8qosy8adQ51EFPWFiP86QLC694W7nOaU= +github.com/filecoin-project/venus v1.8.1-0.20221129064655-35d20069eca9/go.mod h1:AnTMHZERLQH9oOSGohEVJAwDfQAHcwLXQuAjjmHIvh4= github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU= -github.com/filecoin-project/venus-auth v1.8.0-rc1/go.mod h1:DR/RdqOycbMyQ4/oKGYMGlh8qwpLw8A40eL1Qbj7KpQ= -github.com/filecoin-project/venus-auth v1.8.0-rc1.0.20221101071442-c2ed29a74e9e h1:KsM5XFZrnoN4JmeRBMwSlpUuexJaNQuRJMjTbZzPcQc= -github.com/filecoin-project/venus-auth v1.8.0-rc1.0.20221101071442-c2ed29a74e9e/go.mod h1:DR/RdqOycbMyQ4/oKGYMGlh8qwpLw8A40eL1Qbj7KpQ= -github.com/filecoin-project/venus-messager v1.8.0-rc2.0.20221103053206-08c153719d87 h1:WBR3DxQkavOPPA3qdy/XRLB1rTW+HdZjQ4aIFzLQ+PI= -github.com/filecoin-project/venus-messager v1.8.0-rc2.0.20221103053206-08c153719d87/go.mod h1:5OasDzCbO9ZbH95SrNG+XWDB5FBaVc41tgMmCjo8nvw= +github.com/filecoin-project/venus-auth v1.8.1-0.20221124050656-73734ff50be5 h1:y5Y6v5lhN3wKNYQkx5EqPh+8f31oe94iSoHIZz7Iado= +github.com/filecoin-project/venus-auth v1.8.1-0.20221124050656-73734ff50be5/go.mod h1:Ckj8F/iuSgXnCb9LvH0IiPR7swJZQAhabDOxVycLGWs= +github.com/filecoin-project/venus-messager v1.8.1-0.20221124093938-eee70cda6d54 h1:eCqkaKpwAZB/zueTuEuxPbSyLjfaIOEy1EW1pW6Um4Y= +github.com/filecoin-project/venus-messager v1.8.1-0.20221124093938-eee70cda6d54/go.mod h1:nx7sbYe48/hznmEjAZ99mRLkvJBtdieNXQrZQX4l/DE= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= @@ -854,8 +852,8 @@ github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go. github.com/ipfs-force-community/venus-common-utils v0.0.0-20210924063144-1d3a5b30de87/go.mod h1:RTVEOzM+hkpqmcEWpyLDkx1oGO5r9ZWCgYxG/CsXzJQ= github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 h1:v/1/INcqm3kHLauWQYB63MwWJRWGz+3WEuUPp0jzIl8= github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7/go.mod h1:sSTUXgIu95tPHvgcYhdLuELmgPJWCP/pNMFtsrVtOyA= -github.com/ipfs-force-community/venus-gateway v1.8.0-rc1.0.20221102034912-cf73a722a26a h1:UKfKybn7vfPaLNp0aBYYp4OWc3OxhhcGY9/+P4s4Q9k= -github.com/ipfs-force-community/venus-gateway v1.8.0-rc1.0.20221102034912-cf73a722a26a/go.mod h1:e6f4ykbl1NG9b2dSWlWJPYHuU7ai799+Mab4CptlzKA= +github.com/ipfs-force-community/venus-gateway v1.8.1-0.20221124093814-a16028a23ace h1:4erg+UuV17y+iol22f7S1tYkAFWh+KHEQNq5dnp35ns= +github.com/ipfs-force-community/venus-gateway v1.8.1-0.20221124093814-a16028a23ace/go.mod h1:hyWeGESVOksjvdi0hFC9z+2WveZr6tHuOtPANpm+fD8= github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= @@ -965,6 +963,7 @@ github.com/ipfs/go-ipfs-chunker v0.0.5/go.mod h1:jhgdF8vxRHycr00k13FM8Y0E+6BoalY github.com/ipfs/go-ipfs-cmdkit v0.0.1/go.mod h1:9FtbMdUabcSqv/G4/8WCxSLxkZxn/aZEFrxxqnVcRbg= github.com/ipfs/go-ipfs-cmds v0.6.0/go.mod h1:ZgYiWVnCk43ChwoH8hAmI1IRbuVtq3GSTHwtRB/Kqhk= github.com/ipfs/go-ipfs-cmds v0.6.1-0.20220212012746-40b8fddb899f/go.mod h1:y0bflH6m4g6ary4HniYt98UqbrVnRxmRarzeMdLIUn0= +github.com/ipfs/go-ipfs-cmds v0.8.1/go.mod h1:y0bflH6m4g6ary4HniYt98UqbrVnRxmRarzeMdLIUn0= github.com/ipfs/go-ipfs-config v0.18.0/go.mod h1:wz2lKzOjgJeYJa6zx8W9VT7mz+iSd0laBMqS/9wmX6A= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= diff --git a/minermgr/miner_mgr.go b/minermgr/miner_mgr.go index 4fcd9f73..6223d318 100644 --- a/minermgr/miner_mgr.go +++ b/minermgr/miner_mgr.go @@ -3,55 +3,31 @@ package minermgr import ( "context" "sync" - "time" - - logging "github.com/ipfs/go-log/v2" "github.com/ipfs-force-community/metrics" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/venus-auth/auth" - "github.com/filecoin-project/venus-auth/core" - "github.com/filecoin-project/venus-auth/jwtclient" - "github.com/filecoin-project/venus-market/v2/config" "github.com/filecoin-project/venus/venus-shared/types/market" ) -const CoMinersLimit = 500 - -var log = logging.Logger("user-manager") - type MinerMgrImpl struct { - authClient *jwtclient.AuthClient - authNode config.AuthNode - miners map[address.Address]*market.User lk sync.Mutex - - localMiners map[address.Address]*market.User } var _ IMinerMgr = (*MinerMgrImpl)(nil) -func NewMinerMgrImpl(ctx metrics.MetricsCtx, authClient *jwtclient.AuthClient, cfg *config.MarketConfig) (IMinerMgr, error) { +func NewMinerMgrImpl(_ metrics.MetricsCtx, cfg *config.MarketConfig) (IMinerMgr, error) { m := &MinerMgrImpl{ - authClient: authClient, - authNode: cfg.AuthNode, - - miners: make(map[address.Address]*market.User), - localMiners: make(map[address.Address]*market.User), + miners: make(map[address.Address]*market.User), } // storage miner for _, miner := range cfg.Miners { - m.localMiners[address.Address(miner.Addr)] = &market.User{Addr: address.Address(miner.Addr), Account: miner.Account} - } - - if authClient != nil { - go m.refreshUsers(ctx) + m.miners[address.Address(miner.Addr)] = &market.User{Addr: address.Address(miner.Addr), Account: miner.Account} } return m, nil @@ -66,14 +42,10 @@ func (m *MinerMgrImpl) MinerList(context.Context) ([]address.Address, error) { miners = append(miners, miner) } - for miner := range m.localMiners { - miners = append(miners, miner) - } - return miners, nil } -func (m *MinerMgrImpl) ActorList(ctx context.Context) ([]market.User, error) { +func (m *MinerMgrImpl) ActorList(_ context.Context) ([]market.User, error) { m.lk.Lock() defer m.lk.Unlock() @@ -82,79 +54,13 @@ func (m *MinerMgrImpl) ActorList(ctx context.Context) ([]market.User, error) { users = append(users, market.User{Addr: user.Addr, Account: user.Account}) } - for _, user := range m.localMiners { - users = append(users, market.User{Addr: user.Addr, Account: user.Account}) - } - return users, nil } -func (m *MinerMgrImpl) Has(ctx context.Context, mAddr address.Address) bool { +func (m *MinerMgrImpl) Has(_ context.Context, mAddr address.Address) bool { m.lk.Lock() defer m.lk.Unlock() _, ok := m.miners[mAddr] - if ok { - return ok - } - - _, ok = m.localMiners[mAddr] return ok } - -func (m *MinerMgrImpl) getMinerFromVenusAuth(ctx context.Context, skip, limit int64) error { - m.lk.Lock() - defer m.lk.Unlock() - - if m.authClient == nil { - return nil - } - - if limit == 0 { - limit = CoMinersLimit - } - - usersWithMiners, err := m.authClient.ListUsersWithMiners(&auth.ListUsersRequest{ - Page: &core.Page{Skip: skip, Limit: limit}, - State: int(core.UserStateEnabled), - }) - if err != nil { - return err - } - - // To avoid some deleted or disabled ones that cannot be removed, re-add them here - m.miners = make(map[address.Address]*market.User) - - for _, u := range usersWithMiners { - for _, miner := range u.Miners { - addr, err := address.NewFromString(miner.Miner) - if err != nil { - log.Warnf("invalid miner: %s in user: %s", miner.Miner, u.Name) - continue - } - - if _, ok := m.miners[addr]; !ok { - m.miners[addr] = &market.User{Addr: addr, Account: u.Name} - } - } - } - - return nil -} - -func (m *MinerMgrImpl) refreshUsers(ctx context.Context) { - if err := m.getMinerFromVenusAuth(ctx, 0, 0); err != nil { - log.Errorf("first sync users from venus-auth(%s) failed: %s", m.authNode.Url, err) - } - - tm := time.NewTicker(time.Minute) - defer tm.Stop() - - for range tm.C { - log.Infof("sync users from venus-auth, url: %s\n", m.authNode.Url) - - if err := m.getMinerFromVenusAuth(ctx, 0, 0); err != nil { - log.Errorf("users from venus-auth(%s) failed:%s", m.authNode.Url, err.Error()) - } - } -} diff --git a/minermgr/module.go b/minermgr/module.go index 5b156521..9e9efc74 100644 --- a/minermgr/module.go +++ b/minermgr/module.go @@ -10,6 +10,8 @@ import ( marketTypes "github.com/filecoin-project/venus/venus-shared/types/market" ) +// todo 支持动态配置? + type IMinerMgr interface { Has(context.Context, address.Address) bool MinerList(context.Context) ([]address.Address, error) diff --git a/retrievalprovider/event_stream.go b/retrievalprovider/event_stream.go new file mode 100644 index 00000000..e7c0abce --- /dev/null +++ b/retrievalprovider/event_stream.go @@ -0,0 +1,26 @@ +package retrievalprovider + +import ( + "time" + + "github.com/ipfs-force-community/metrics" + + "github.com/filecoin-project/venus-auth/jwtclient" + + "github.com/ipfs-force-community/venus-gateway/marketevent" + "github.com/ipfs-force-community/venus-gateway/types" + "github.com/ipfs-force-community/venus-gateway/validator" + + gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" +) + +func NewMarketEventStream(mCtx metrics.MetricsCtx, authClient *jwtclient.AuthClient) gatewayAPIV2.IMarketEvent { + + marketStream := marketevent.NewMarketEventStream(mCtx, validator.NewMinerValidator(authClient), &types.RequestConfig{ + RequestQueueSize: 30, + RequestTimeout: time.Hour * 7, // wait seven hour to do unseal + ClearInterval: time.Minute * 5, + }) + + return marketStream +} diff --git a/retrievalprovider/modules.go b/retrievalprovider/modules.go index e7d4d7d6..202f0c3d 100644 --- a/retrievalprovider/modules.go +++ b/retrievalprovider/modules.go @@ -17,6 +17,7 @@ import ( "github.com/filecoin-project/venus-market/v2/dealfilter" _ "github.com/filecoin-project/venus-market/v2/network" + gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" types "github.com/filecoin-project/venus/venus-shared/types/market" ) @@ -78,5 +79,8 @@ var RetrievalProviderOpts = func(cfg *config.MarketConfig) builder.Option { builder.Override(new(IRetrievalProvider), NewProvider), // save to metadata /retrievals/provider builder.Override(HandleRetrievalKey, HandleRetrieval), builder.Override(new(config.RetrievalDealFilter), RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg))), + builder.Override(new(gatewayAPIV2.IMarketEvent), NewMarketEventStream), + builder.Override(new(gatewayAPIV2.IMarketClient), builder.From(new(gatewayAPIV2.IMarketEvent))), + builder.Override(new(gatewayAPIV2.IMarketServiceProvider), builder.From(new(gatewayAPIV2.IMarketEvent))), ) } diff --git a/retrievalprovider/provider.go b/retrievalprovider/provider.go index 17863b7b..1cf96ed3 100644 --- a/retrievalprovider/provider.go +++ b/retrievalprovider/provider.go @@ -52,7 +52,8 @@ type RetrievalProvider struct { } // NewProvider returns a new retrieval Provider -func NewProvider(network rmnet.RetrievalMarketNetwork, +func NewProvider( + network rmnet.RetrievalMarketNetwork, dagStore stores.DAGStoreWrapper, dataTransfer network.ProviderDataTransfer, fullNode v1api.FullNode, diff --git a/retrievalprovider/provider_environments.go b/retrievalprovider/provider_environments.go index 23156d5a..f488215a 100644 --- a/retrievalprovider/provider_environments.go +++ b/retrievalprovider/provider_environments.go @@ -5,9 +5,6 @@ import ( "errors" "fmt" - "github.com/filecoin-project/go-fil-markets/stores" - "github.com/filecoin-project/venus-market/v2/models/repo" - types "github.com/filecoin-project/venus/venus-shared/types/market" "github.com/ipfs/go-cid" bstore "github.com/ipfs/go-ipfs-blockstore" "github.com/libp2p/go-libp2p/core/peer" @@ -17,7 +14,12 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/dtutils" "github.com/filecoin-project/go-fil-markets/shared" + "github.com/filecoin-project/go-fil-markets/stores" "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/venus-market/v2/models/repo" + + types "github.com/filecoin-project/venus/venus-shared/types/market" ) // CheckDealParams verifies the given deal params are acceptable @@ -56,7 +58,6 @@ type providerDealEnvironment struct { // to add all blocks to a blockstore that is used to serve retrieval func (pde *providerDealEnvironment) PrepareBlockstore(ctx context.Context, dealID retrievalmarket.DealID, pieceCid cid.Cid) error { // Load the blockstore that has the deal data - // 触发unseal过程 bs, err := pde.p.dagStore.LoadShard(ctx, pieceCid) if err != nil { return fmt.Errorf("failed to load blockstore for piece %s: %w", pieceCid, err)