Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backports: 8986, 8982 #8988

Merged
merged 2 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,22 @@
# env var: LOTUS_PROVING_PARALLELCHECKLIMIT
#ParallelCheckLimit = 128

# Setting this value above the network limit has no effect
#
# type: int
# env var: LOTUS_PROVING_MAXPARTITIONSPERPOSTMESSAGE
#MaxPartitionsPerPoStMessage = 0

# In some cases when submitting DeclareFaultsRecovered messages,
# there may be too many recoveries to fit in a BlockGasLimit.
# In those cases it may be necessary to set this value to something low (eg 1);
# Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
# resulting in more total gas use (but each message will have lower gas limit)
#
# type: int
# env var: LOTUS_PROVING_MAXPARTITIONSPERRECOVERYMESSAGE
#MaxPartitionsPerRecoveryMessage = 0


[Sealing]
# Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time.
Expand Down
2 changes: 1 addition & 1 deletion node/builder_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(*miner.Miner), modules.SetupBlockProducer),
Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver),
Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)),
Override(new(*storage.WindowPoStScheduler), modules.WindowPostScheduler(cfg.Fees)),
Override(new(*storage.WindowPoStScheduler), modules.WindowPostScheduler(cfg.Fees, cfg.Proving)),
Override(new(sectorblocks.SectorBuilder), From(new(*storage.Miner))),
),

Expand Down
16 changes: 16 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,29 @@ type ProvingConfig struct {
// Maximum number of sector checks to run in parallel. (0 = unlimited)
ParallelCheckLimit int

// Maximum number of partitions to prove in a single SubmitWindowPoSt messace. 0 = network limit (10 in nv16)
//
// A single partition may contain up to 2349 32GiB sectors, or 2300 64GiB sectors.
//
// The maximum number of sectors which can be proven in a single PoSt message is 25000 in network version 16, which
// means that a single message can prove at most 10 partinions
//
// In some cases when submitting PoSt messages which are recovering sectors, the default network limit may still be
// too high to fit in the block gas limit; In those cases it may be necessary to set this value to something lower
// than 10; Note that setting this value lower may result in less efficient gas use - more messages will be sent,
// to prove each deadline, resulting in more total gas use (but each message will have lower gas limit)
//
// Setting this value above the network limit has no effect
MaxPartitionsPerPoStMessage int

// Maximum number of partitions to declare in a single DeclareFaultsRecovered message. 0 = no limit.

// In some cases when submitting DeclareFaultsRecovered messages,
// there may be too many recoveries to fit in a BlockGasLimit.
// In those cases it may be necessary to set this value to something low (eg 1);
// Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
// resulting in more total gas use (but each message will have lower gas limit)
MaxPartitionsPerRecoveryMessage int
// todo disable builtin post
}

Expand Down
4 changes: 2 additions & 2 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
}
}

func WindowPostScheduler(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) {
func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) {
return func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) {
var (
mctx = params.MetricsCtx
Expand All @@ -271,7 +271,7 @@ func WindowPostScheduler(fc config.MinerFeeConfig) func(params StorageMinerParam

ctx := helpers.LifecycleCtx(mctx, lc)

fps, err := storage.NewWindowedPoStScheduler(api, fc, as, sealer, verif, sealer, j, maddr)
fps, err := storage.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, maddr)
if err != nil {
return nil, err
}
Expand Down
137 changes: 76 additions & 61 deletions storage/wdpost_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,15 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
// TODO: the waiting should happen in the background. Right now this
// is blocking/delaying the actual generation and submission of WindowPoSts in
// this deadline!
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([][]miner.RecoveryDeclaration, []*types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries")
defer span.End()

faulty := uint64(0)
params := &miner.DeclareFaultsRecoveredParams{
Recoveries: []miner.RecoveryDeclaration{},
}

var batchedRecoveryDecls [][]miner.RecoveryDeclaration
batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
totalRecoveries := 0

for partIdx, partition := range partitions {
unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors)
Expand Down Expand Up @@ -302,55 +303,72 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6
continue
}

params.Recoveries = append(params.Recoveries, miner.RecoveryDeclaration{
// respect user config if set
if s.maxPartitionsPerRecoveryMessage > 0 &&
len(batchedRecoveryDecls[len(batchedRecoveryDecls)-1]) >= s.maxPartitionsPerRecoveryMessage {
batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
}

batchedRecoveryDecls[len(batchedRecoveryDecls)-1] = append(batchedRecoveryDecls[len(batchedRecoveryDecls)-1], miner.RecoveryDeclaration{
Deadline: dlIdx,
Partition: uint64(partIdx),
Sectors: recovered,
})

totalRecoveries++
}

recoveries := params.Recoveries
if len(recoveries) == 0 {
if totalRecoveries == 0 {
if faulty != 0 {
log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty)
}

return recoveries, nil, nil
return nil, nil, nil
}

enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
}
var msgs []*types.SignedMessage
for _, recovery := range batchedRecoveryDecls {
params := &miner.DeclareFaultsRecoveredParams{
Recoveries: recovery,
}

msg := &types.Message{
To: s.actor,
Method: builtin.MethodsMiner.DeclareFaultsRecovered,
Params: enc,
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.prepareMessage(ctx, msg, spec); err != nil {
return recoveries, nil, err
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return nil, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
}

sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
if err != nil {
return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err)
}
msg := &types.Message{
To: s.actor,
Method: builtin.MethodsMiner.DeclareFaultsRecovered,
Params: enc,
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.prepareMessage(ctx, msg, spec); err != nil {
return nil, nil, err
}

log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
if err != nil {
return nil, nil, xerrors.Errorf("pushing message to mpool: %w", err)
}

rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err)
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
msgs = append(msgs, sm)
}

if rec.Receipt.ExitCode != 0 {
return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
for _, msg := range msgs {
rec, err := s.api.StateWaitMsg(context.TODO(), msg.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait error: %w", err)
}

if rec.Receipt.ExitCode != 0 {
return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
}
}

return recoveries, sm, nil
return batchedRecoveryDecls, msgs, nil
}

// declareFaults identifies the sectors on the specified proving deadline that
Expand Down Expand Up @@ -464,9 +482,8 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
}

var (
sigmsg *types.SignedMessage
recoveries []miner.RecoveryDeclaration
faults []miner.FaultDeclaration
sigmsgs []*types.SignedMessage
recoveries [][]miner.RecoveryDeclaration

// optionalCid returns the CID of the message, or cid.Undef is the
// message is nil. We don't need the argument (could capture the
Expand All @@ -479,37 +496,28 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
}
)

if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
if recoveries, sigmsgs, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
log.Errorf("checking sector recoveries: %v", err)
}

s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
j := WdPoStRecoveriesProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: recoveries,
MessageCID: optionalCid(sigmsg),
// should always be true, skip journaling if not for some reason
if len(recoveries) == len(sigmsgs) {
for i, recovery := range recoveries {
// clone for function literal
recovery := recovery
msgCID := optionalCid(sigmsgs[i])
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
j := WdPoStRecoveriesProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: recovery,
MessageCID: msgCID,
}
j.Error = err
return j
})
}
j.Error = err
return j
})

if ts.Height() > build.UpgradeIgnitionHeight {
return // FORK: declaring faults after ignition upgrade makes no sense
}

if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is also potentially really bad, but we try to post anyways
log.Errorf("checking sector faults: %v", err)
}

s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
return WdPoStFaultsProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: faults,
MessageCID: optionalCid(sigmsg),
}
})
}()
}

Expand Down Expand Up @@ -778,6 +786,13 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net
partitionsPerMsg = declMax
}

// respect user config if set
if s.maxPartitionsPerPostMessage > 0 {
if partitionsPerMsg > s.maxPartitionsPerPostMessage {
partitionsPerMsg = s.maxPartitionsPerPostMessage
}
}

// The number of messages will be:
// ceiling(number of partitions / partitions per message)
batchCount := len(partitions) / partitionsPerMsg
Expand Down
Loading