Skip to content
This repository has been archived by the owner on Dec 19, 2022. It is now read-only.

reduce list sectors time #79

Merged
merged 1 commit into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 95 additions & 73 deletions api/impl/strageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/go-state-types/big"
proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
sto "github.com/filecoin-project/specs-storage/storage"
multi "github.com/hashicorp/go-multierror"

"github.com/filecoin-project/venus/app/submodule/apitypes"
"github.com/filecoin-project/venus/pkg/chain"
Expand Down Expand Up @@ -226,7 +227,7 @@ func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, err
}

// List all staged sector's info in particular states
func (sm *StorageMinerAPI) SectorsInfoListInStates(ctx context.Context, states []api.SectorState, showOnChainInfo bool) ([]api.SectorInfo, error) {
func (sm *StorageMinerAPI) SectorsInfoListInStates(ctx context.Context, states []api.SectorState, showOnChainInfo, skipLog bool) ([]api.SectorInfo, error) {
sectors, err := sm.Miner.ListSectors()
if err != nil {
return nil, err
Expand Down Expand Up @@ -257,89 +258,110 @@ func (sm *StorageMinerAPI) SectorsInfoListInStates(ctx context.Context, states [
}

out := make([]api.SectorInfo, len(sis))
group := multi.Group{}
limit := make(chan struct{}, 1)
if !skipLog {
// increase concurrency and reduce query time
limit = make(chan struct{}, 5)
}

for i, sector := range sis {
deals := make([]abi.DealID, len(sector.Pieces))
for i, piece := range sector.Pieces {
if piece.DealInfo == nil {
continue
sector := sector
i := i
oneSector := func() error {
deals := make([]abi.DealID, len(sector.Pieces))
for i, piece := range sector.Pieces {
if piece.DealInfo == nil {
continue
}
deals[i] = piece.DealInfo.DealID
}
deals[i] = piece.DealInfo.DealID
}

logs, err := sm.LogService.List(sector.SectorNumber)
if err != nil {
return nil, err
}
log := make([]api.SectorLog, len(logs))
for i, l := range logs {
log[i] = api.SectorLog{
Kind: l.Kind,
Timestamp: l.Timestamp,
Trace: l.Trace,
Message: l.Message,
var log []api.SectorLog
if !skipLog {
logs, err := sm.LogService.List(sector.SectorNumber)
if err != nil {
return err
}
log = make([]api.SectorLog, len(logs))
for i, l := range logs {
log[i] = api.SectorLog{
Kind: l.Kind,
Timestamp: l.Timestamp,
Trace: l.Trace,
Message: l.Message,
}
}
}
}

sInfo := api.SectorInfo{
SectorID: sector.SectorNumber,
State: api.SectorState(sector.State),
CommD: sector.CommD,
CommR: sector.CommR,
Proof: sector.Proof,
Deals: deals,
Ticket: api.SealTicket{
Value: sector.TicketValue,
Epoch: sector.TicketEpoch,
},
Seed: api.SealSeed{
Value: sector.SeedValue,
Epoch: sector.SeedEpoch,
},
PreCommitMsg: sector.PreCommitMessage,
CommitMsg: sector.CommitMessage,
Retries: sector.InvalidProofs,
ToUpgrade: sm.Miner.IsMarkedForUpgrade(sector.SectorNumber),

LastErr: sector.LastErr,
Log: log,
// on chain info
SealProof: 0,
Activation: 0,
Expiration: 0,
DealWeight: big.Zero(),
VerifiedDealWeight: big.Zero(),
InitialPledge: big.Zero(),
OnTime: 0,
Early: 0,
}

if showOnChainInfo {
onChainInfo, err := sm.Full.StateSectorGetInfo(ctx, sm.Miner.Address(), sector.SectorNumber, types.EmptyTSK)
if err != nil {
return nil, err
sInfo := api.SectorInfo{
SectorID: sector.SectorNumber,
State: api.SectorState(sector.State),
CommD: sector.CommD,
CommR: sector.CommR,
Proof: sector.Proof,
Deals: deals,
Ticket: api.SealTicket{
Value: sector.TicketValue,
Epoch: sector.TicketEpoch,
},
Seed: api.SealSeed{
Value: sector.SeedValue,
Epoch: sector.SeedEpoch,
},
PreCommitMsg: sector.PreCommitMessage,
CommitMsg: sector.CommitMessage,
Retries: sector.InvalidProofs,
ToUpgrade: sm.Miner.IsMarkedForUpgrade(sector.SectorNumber),

LastErr: sector.LastErr,
Log: log,
// on chain info
SealProof: 0,
Activation: 0,
Expiration: 0,
DealWeight: big.Zero(),
VerifiedDealWeight: big.Zero(),
InitialPledge: big.Zero(),
OnTime: 0,
Early: 0,
}
if onChainInfo != nil {
sInfo.SealProof = onChainInfo.SealProof
sInfo.Activation = onChainInfo.Activation
sInfo.Expiration = onChainInfo.Expiration
sInfo.DealWeight = onChainInfo.DealWeight
sInfo.VerifiedDealWeight = onChainInfo.VerifiedDealWeight
sInfo.InitialPledge = onChainInfo.InitialPledge

ex, err := sm.Full.StateSectorExpiration(ctx, sm.Miner.Address(), sector.SectorNumber, types.EmptyTSK)
if err == nil {
sInfo.OnTime = ex.OnTime
sInfo.Early = ex.Early
} else {
// TODO The official didn't deal with this

if showOnChainInfo {
onChainInfo, err := sm.Full.StateSectorGetInfo(ctx, sm.Miner.Address(), sector.SectorNumber, types.EmptyTSK)
if err != nil {
return err
}
if onChainInfo != nil {
sInfo.SealProof = onChainInfo.SealProof
sInfo.Activation = onChainInfo.Activation
sInfo.Expiration = onChainInfo.Expiration
sInfo.DealWeight = onChainInfo.DealWeight
sInfo.VerifiedDealWeight = onChainInfo.VerifiedDealWeight
sInfo.InitialPledge = onChainInfo.InitialPledge

ex, err := sm.Full.StateSectorExpiration(ctx, sm.Miner.Address(), sector.SectorNumber, types.EmptyTSK)
if err == nil {
sInfo.OnTime = ex.OnTime
sInfo.Early = ex.Early
} else {
// TODO The official didn't deal with this
}
}
}
}

out[i] = sInfo
out[i] = sInfo

return nil
}
limit <- struct{}{}
group.Go(oneSector)
<-limit
}
errs := group.Wait()
close(limit)

return out, nil
return out, errs.ErrorOrNil()
}

func (sm *StorageMinerAPI) SectorsListInStates(ctx context.Context, states []api.SectorState) ([]abi.SectorNumber, error) {
Expand Down
48 changes: 24 additions & 24 deletions api/storage_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type StorageMiner interface {
SectorsList(context.Context) ([]abi.SectorNumber, error)

// List all staged sector's info in particular states
SectorsInfoListInStates(ctx context.Context, ss []SectorState, showOnChainInfo bool) ([]SectorInfo, error)
SectorsInfoListInStates(ctx context.Context, ss []SectorState, showOnChainInfo, skipLog bool) ([]SectorInfo, error)

// Get summary info of sectors
SectorsSummary(ctx context.Context) (map[SectorState]int, error)
Expand Down Expand Up @@ -165,27 +165,27 @@ type StorageMinerStruct struct {

PledgeSector func(context.Context) (abi.SectorID, error) `perm:"write"`

SectorsStatus func(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (SectorInfo, error) `perm:"read"`
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
SectorsListInStates func(context.Context, []SectorState) ([]abi.SectorNumber, error) `perm:"read"`
SectorsInfoListInStates func(ctx context.Context, ss []SectorState, showOnChainInfo bool) ([]SectorInfo, error) `perm:"read"`
SectorsSummary func(ctx context.Context) (map[SectorState]int, error) `perm:"read"`
SectorsRefs func(context.Context) (map[string][]types.SealedRef, error) `perm:"read"`
SectorStartSealing func(context.Context, abi.SectorNumber) error `perm:"write"`
SectorSetSealDelay func(context.Context, time.Duration) error `perm:"write"`
SectorGetSealDelay func(context.Context) (time.Duration, error) `perm:"read"`
SectorSetExpectedSealDuration func(context.Context, time.Duration) error `perm:"write"`
SectorGetExpectedSealDuration func(context.Context) (time.Duration, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, SectorState) error `perm:"admin"`
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorTerminateFlush func(ctx context.Context) (string, error) `perm:"admin"`
SectorTerminatePending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"`
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
SectorPreCommitFlush func(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) `perm:"admin"`
SectorPreCommitPending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"`
SectorCommitFlush func(ctx context.Context) ([]sealiface.CommitBatchRes, error) `perm:"admin"`
SectorCommitPending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"`
SectorsStatus func(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (SectorInfo, error) `perm:"read"`
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
SectorsListInStates func(context.Context, []SectorState) ([]abi.SectorNumber, error) `perm:"read"`
SectorsInfoListInStates func(ctx context.Context, ss []SectorState, showOnChainInfo, skipLog bool) ([]SectorInfo, error) `perm:"read"`
SectorsSummary func(ctx context.Context) (map[SectorState]int, error) `perm:"read"`
SectorsRefs func(context.Context) (map[string][]types.SealedRef, error) `perm:"read"`
SectorStartSealing func(context.Context, abi.SectorNumber) error `perm:"write"`
SectorSetSealDelay func(context.Context, time.Duration) error `perm:"write"`
SectorGetSealDelay func(context.Context) (time.Duration, error) `perm:"read"`
SectorSetExpectedSealDuration func(context.Context, time.Duration) error `perm:"write"`
SectorGetExpectedSealDuration func(context.Context) (time.Duration, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, SectorState) error `perm:"admin"`
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorTerminateFlush func(ctx context.Context) (string, error) `perm:"admin"`
SectorTerminatePending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"`
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
SectorPreCommitFlush func(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) `perm:"admin"`
SectorPreCommitPending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"`
SectorCommitFlush func(ctx context.Context) ([]sealiface.CommitBatchRes, error) `perm:"admin"`
SectorCommitPending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"`

WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm
WorkerStats func(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) `perm:"admin"`
Expand Down Expand Up @@ -288,8 +288,8 @@ func (c *StorageMinerStruct) SectorsListInStates(ctx context.Context, states []S
}

// List all staged sector's info in particular states
func (c *StorageMinerStruct) SectorsInfoListInStates(ctx context.Context, ss []SectorState, showOnChainInfo bool) ([]SectorInfo, error) {
return c.Internal.SectorsInfoListInStates(ctx, ss, showOnChainInfo)
func (c *StorageMinerStruct) SectorsInfoListInStates(ctx context.Context, ss []SectorState, showOnChainInfo, skipLog bool) ([]SectorInfo, error) {
return c.Internal.SectorsInfoListInStates(ctx, ss, showOnChainInfo, skipLog)
}

func (c *StorageMinerStruct) SectorsSummary(ctx context.Context) (map[SectorState]int, error) {
Expand Down
10 changes: 7 additions & 3 deletions app/venus-sealer/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/filecoin-project/venus-sealer/api"
"github.com/filecoin-project/venus-sealer/lib/tablewriter"
types2 "github.com/filecoin-project/venus-sealer/types"

)

var sectorsCmd = &cli.Command{
Expand Down Expand Up @@ -220,8 +219,13 @@ var sectorsListCmd = &cli.Command{
}
}

// reduce query time
skipLog := true
if cctx.Bool("events") || cctx.Bool("seal-time") {
skipLog = false
}
fast := cctx.Bool("fast")
list, err = storageAPI.SectorsInfoListInStates(ctx, ss, !fast)
list, err = storageAPI.SectorsInfoListInStates(ctx, ss, !fast, skipLog)
if err != nil {
return err
}
Expand Down Expand Up @@ -458,7 +462,7 @@ var sectorsExtendCmd = &cli.Command{
}
defer closer()

maddr, err := getActorAddress(ctx, nodeApi, cctx.String("actor"))
maddr, err := getActorAddress(ctx, nodeApi, cctx.String("actor"))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.8.0
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs-force-community/venus-common-utils v0.0.0-20210714054928-2042a9040759
github.com/ipfs-force-community/venus-gateway v1.1.0
github.com/ipfs/go-block-format v0.0.3
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,9 @@ github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtng
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
Expand Down