Skip to content
This repository has been archived by the owner on Dec 19, 2022. It is now read-only.

Feat/support oss storage #147

Merged
merged 5 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func GetFullNodeAPIV2(cctx *cli.Context) (FullNode, jsonrpc.ClientCloser, error)
return NewFullNodeRPC(cctx.Context, addr, apiInfo.AuthHeader())
}

func GetFullNodeFromNodeConfig(ctx context.Context, cfg *config.NodeConfig) (FullNode, jsonrpc.ClientCloser, error) {
apiInfo := apiinfo.NewAPIInfo(cfg.Url, cfg.Token)
addr, err := apiInfo.DialArgs("v1")
if err != nil {
return nil, nil, xerrors.Errorf("could not get DialArgs: %w", err)
}
return NewFullNodeRPC(ctx, addr, apiInfo.AuthHeader())
}

func GetFullNodeAPIFromConfig(cctx *cli.Context) (apiinfo.APIInfo, error) {
repoPath := cctx.String("repo")
cfgPath := config.FsConfig(repoPath)
Expand Down
2 changes: 2 additions & 0 deletions api/impl/strageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package impl
import (
"context"
"encoding/json"
api2 "github.com/filecoin-project/venus-market/api"
types4 "github.com/filecoin-project/venus-market/types"
"net/http"
"strconv"
"time"
Expand Down
1 change: 1 addition & 0 deletions api/storage_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
types4 "github.com/filecoin-project/venus-market/types"
"time"

"github.com/google/uuid"
Expand Down
114 changes: 77 additions & 37 deletions app/venus-sealer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"github.com/filecoin-project/venus-market/piecestorage"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -130,20 +131,28 @@ var initCmd = &cli.Command{
Usage: "gateway token",
},

&cli.StringFlag{
Name: "market-url",
Usage: "market url",
},
&cli.StringFlag{
Name: "market-token",
Usage: "market token",
},

&cli.StringFlag{
Name: "auth-token",
Usage: "auth token",
},

&cli.StringFlag{
Name: "piecestorage",
Usage: "config storage for piece (eg fs:/mnt/piece s3:{access key}:{secret key}:{option token}@{region}host/{bucket}",
},
},
Action: func(cctx *cli.Context) error {
ctx := api.ReqContext(cctx)
log.Info("Initializing venus sealer")

sectorSizeInt, err := units.RAMInBytes(cctx.String("sector-size"))
if err != nil {
return err
}
ssize := abi.SectorSize(sectorSizeInt)

gasPrice, err := types.BigFromString(cctx.String("gas-premium"))
if err != nil {
return xerrors.Errorf("failed to parse gas-price flag: %s", err)
Expand All @@ -160,8 +169,11 @@ var initCmd = &cli.Command{
return err
}

setAuthToken(cctx)
parseFlag(defaultCfg, cctx)
setAuthToken(defaultCfg, cctx)
err = parseFlag(defaultCfg, cctx)
if err != nil {
return err
}
if err := checkURL(defaultCfg); err != nil {
return err
}
Expand All @@ -176,14 +188,9 @@ var initCmd = &cli.Command{
return err
}

ctx := api.ReqContext(cctx)
if err := paramfetch.GetParams(ctx, ps, srs, uint64(ssize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}

log.Info("Trying to connect to full node RPC")

fullNode, closer, err := api.GetFullNodeAPIV2(cctx) // TODO: consider storing full node address in config
fullNode, closer, err := api.GetFullNodeFromNodeConfig(ctx, &defaultCfg.Node) // TODO: consider storing full node address in config
if err != nil {
return err
}
Expand Down Expand Up @@ -282,7 +289,12 @@ var initCmd = &cli.Command{
}
}

if err := storageMinerInit(ctx, cctx, fullNode, messagerClient, defaultCfg, ssize, gasPrice); err != nil {
ssize, err := units.RAMInBytes(cctx.String("sector-size"))
if err != nil {
return fmt.Errorf("failed to parse sector size: %w", err)
}
minerAddr, err := storageMinerInit(ctx, cctx, fullNode, messagerClient, defaultCfg, abi.SectorSize(ssize), gasPrice)
if err != nil {
log.Errorf("Failed to initialize venus-miner: %+v", err)
path, err := homedir.Expand(defaultCfg.DataDir)
if err != nil {
Expand All @@ -295,23 +307,31 @@ var initCmd = &cli.Command{
return xerrors.Errorf("Storage-miner init failed")
}

minerInfo, err := fullNode.StateMinerInfo(ctx, minerAddr, types.EmptyTSK)
if err != nil {
return err
}
// TODO: Point to setting storage price, maybe do it interactively or something
log.Info("Sealer successfully created, you can now start it with 'venus-sealer run'")

if err := paramfetch.GetParams(ctx, ps, srs, uint64(minerInfo.SectorSize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}
return nil
},
}

func setAuthToken(cctx *cli.Context) {
func setAuthToken(cfg *config.StorageMiner, cctx *cli.Context) {
if cctx.IsSet("auth-token") {
authToken := cctx.String("auth-token")
_ = cctx.Set("node-token", authToken)
_ = cctx.Set("messager-token", authToken)
_ = cctx.Set("gateway-token", authToken)
cfg.Node.Token = authToken
cfg.Messager.Token = authToken
cfg.RegisterProof.Token = authToken
cfg.RegisterMarket.Token = authToken
cfg.RegisterMarket.Token = authToken
}
}

func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) error {
cfg.DataDir = cctx.String("repo")

if cctx.IsSet("messager-url") {
Expand All @@ -326,6 +346,11 @@ func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
cfg.RegisterProof.Urls = cctx.StringSlice("gateway-url")
}

if cctx.IsSet("market-url") {
cfg.RegisterMarket.Urls = []string{cctx.String("market-url")}
cfg.Market.Url = cctx.String("market-url")
}

if cctx.IsSet("node-token") {
cfg.Node.Token = cctx.String("node-token")
}
Expand All @@ -337,6 +362,21 @@ func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
if cctx.IsSet("gateway-token") {
cfg.RegisterProof.Token = cctx.String("gateway-token")
}

if cctx.IsSet("market-token") {
cfg.Market.Token = cctx.String("market-token")
cfg.RegisterMarket.Token = cctx.String("market-token")
}

if cctx.IsSet("piecestorage") {
pieceStorage, err := piecestorage.ParserProtocol(cctx.String("piecestorage"))
if err != nil {
return err
}

cfg.PieceStorage = pieceStorage
}
return nil
}

func parseMultiAddr(url string) error {
Expand Down Expand Up @@ -367,76 +407,76 @@ func checkURL(cfg *config.StorageMiner) error {
return nil
}

func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, messagerClient api.IMessager, cfg *config.StorageMiner, ssize abi.SectorSize, gasPrice types.BigInt) error {
func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, messagerClient api.IMessager, cfg *config.StorageMiner, ssize abi.SectorSize, gasPrice types.BigInt) (address.Address, error) {
log.Info("Initializing libp2p identity")

repo, err := models.SetDataBase(config.HomeDir(cfg.DataDir), &cfg.DB)
if err != nil {
return err
return address.Undef, err
}
err = repo.AutoMigrate()
if err != nil {
return err
return address.Undef, err
}

metaDataService := service.NewMetadataService(repo)
sectorInfoService := service.NewSectorInfoService(repo)
p2pSk, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return xerrors.Errorf("make host key: %w", err)
return address.Undef, xerrors.Errorf("make host key: %w", err)
}

peerid, err := peer.IDFromPrivateKey(p2pSk)
if err != nil {
return xerrors.Errorf("peer ID from private key: %w", err)
return address.Undef, xerrors.Errorf("peer ID from private key: %w", err)
}

var addr address.Address
if act := cctx.String("actor"); act != "" {
a, err := address.NewFromString(act)
if err != nil {
return xerrors.Errorf("failed parsing actor flag value (%q): %w", act, err)
return address.Undef, xerrors.Errorf("failed parsing actor flag value (%q): %w", act, err)
}

if cctx.Bool("genesis-miner") {
if err := metaDataService.SaveMinerAddress(a); err != nil {
return err
return address.Undef, err
}

if pssb := cctx.String("pre-sealed-metadata"); pssb != "" {
pssb, err := homedir.Expand(pssb)
if err != nil {
return err
return address.Undef, err
}

log.Infof("Importing pre-sealed sector metadata for %s", a)

if err := migratePreSealMeta(ctx, api, pssb, a, metaDataService, sectorInfoService); err != nil {
return xerrors.Errorf("migrating presealed sector metadata: %w", err)
return address.Undef, xerrors.Errorf("migrating presealed sector metadata: %w", err)
}
}

return nil
return a, nil
}

if pssb := cctx.String("pre-sealed-metadata"); pssb != "" {
pssb, err := homedir.Expand(pssb)
if err != nil {
return err
return address.Undef, err
}

log.Infof("Importing pre-sealed sector metadata for %s", a)

if err := migratePreSealMeta(ctx, api, pssb, a, metaDataService, sectorInfoService); err != nil {
return xerrors.Errorf("migrating presealed sector metadata: %w", err)
return address.Undef, xerrors.Errorf("migrating presealed sector metadata: %w", err)
}
}

addr = a
} else {
a, err := createStorageMiner(ctx, api, messagerClient, peerid, gasPrice, cctx)
if err != nil {
return xerrors.Errorf("creating miner failed: %w", err)
return address.Undef, xerrors.Errorf("creating miner failed: %w", err)
}

addr = a
Expand All @@ -445,10 +485,10 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode,
log.Infof("Created new miner: %s", addr)

if err := metaDataService.SaveMinerAddress(addr); err != nil {
return err
return address.Undef, err
}

return nil
return addr, nil
}

func createStorageMiner(ctx context.Context, nodeAPI api.FullNode, messagerClient api.IMessager, peerid peer.ID, gasPrice types.BigInt, cctx *cli.Context) (address.Address, error) {
Expand Down
8 changes: 8 additions & 0 deletions app/venus-sealer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var stateList = []stateMeta{

{col: color.FgBlue, state: types.Empty},
{col: color.FgBlue, state: types.WaitDeals},
{col: color.FgBlue, state: types.AddPiece},

{col: color.FgRed, state: types.UndefinedSectorState},
{col: color.FgYellow, state: types.Packing},
Expand All @@ -96,10 +97,15 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: types.PreCommit2},
{col: color.FgYellow, state: types.PreCommitting},
{col: color.FgYellow, state: types.PreCommitWait},
{col: color.FgYellow, state: types.SubmitPreCommitBatch},
{col: color.FgYellow, state: types.PreCommitBatchWait},
{col: color.FgYellow, state: types.WaitSeed},
{col: color.FgYellow, state: types.Committing},
{col: color.FgYellow, state: types.CommitFinalize},
{col: color.FgYellow, state: types.SubmitCommit},
{col: color.FgYellow, state: types.CommitWait},
{col: color.FgYellow, state: types.SubmitCommitAggregate},
{col: color.FgYellow, state: types.CommitAggregateWait},
{col: color.FgYellow, state: types.FinalizeSector},

{col: color.FgCyan, state: types.Terminating},
Expand All @@ -110,11 +116,13 @@ var stateList = []stateMeta{
{col: color.FgCyan, state: types.Removed},

{col: color.FgRed, state: types.FailedUnrecoverable},
{col: color.FgRed, state: types.AddPieceFailed},
{col: color.FgRed, state: types.SealPreCommit1Failed},
{col: color.FgRed, state: types.SealPreCommit2Failed},
{col: color.FgRed, state: types.PreCommitFailed},
{col: color.FgRed, state: types.ComputeProofFailed},
{col: color.FgRed, state: types.CommitFailed},
{col: color.FgRed, state: types.CommitFinalizeFailed},
{col: color.FgRed, state: types.PackingFailed},
{col: color.FgRed, state: types.FinalizeFailed},
{col: color.FgRed, state: types.Faulty},
Expand Down
4 changes: 4 additions & 0 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/filecoin-project/go-state-types/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
api2 "github.com/filecoin-project/venus-market/api"
config2 "github.com/filecoin-project/venus-market/config"
"github.com/filecoin-project/venus-market/piecestorage"
"github.com/filecoin-project/venus-sealer/api"
"github.com/filecoin-project/venus-sealer/api/impl"
"github.com/filecoin-project/venus-sealer/config"
Expand Down Expand Up @@ -167,6 +169,7 @@ func Repo(cfg *config.StorageMiner) Option {
Override(new(sectorstorage.SealerConfig), cfg.Storage),
Override(new(*storage.AddressSelector), AddressSelector(&cfg.Addresses)),
Override(new(*config.DbConfig), &cfg.DB),
Override(new(*config2.PieceStorage), &cfg.PieceStorage),
Override(new(*config.StorageMiner), cfg),
Override(new(*config.MessagerConfig), &cfg.Messager),
Override(new(*config.MarketConfig), &cfg.Market),
Expand All @@ -176,6 +179,7 @@ func Repo(cfg *config.StorageMiner) Option {

Override(new(api.IMessager), api.NewMessageRPC),
Override(new(api2.MarketFullNode), api.NewMarketRPC),
Override(new(piecestorage.IPieceStorage), NewPieceStorage),
Override(new(*market_client.MarketEventClient), market_client.NewMarketEventClient),
Override(new(*proof_client.ProofEventClient), proof_client.NewProofEventClient),
Override(new(repo.Repo), models.SetDataBase),
Expand Down
Loading