From 673f9238be6c77529cdb05cf9c73aa6570530746 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 7 Jul 2022 12:33:40 +0200 Subject: [PATCH 1/2] feat: wdpost: Config for maximum partition count per message --- .../en/default-lotus-miner-config.toml | 6 ++ node/builder_miner.go | 2 +- node/config/doc_gen.go | 6 ++ node/config/types.go | 14 +++ node/modules/storageminer.go | 4 +- storage/wdpost_run.go | 7 ++ storage/wdpost_run_test.go | 99 +++++++++++++++++++ storage/wdpost_sched.go | 37 +++---- 8 files changed, 155 insertions(+), 20 deletions(-) diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index 1e1b0369d46..276e1dd5f2c 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -315,6 +315,12 @@ # env var: LOTUS_PROVING_PARALLELCHECKLIMIT #ParallelCheckLimit = 128 + # Setting this value above the network limit has no effect + # + # type: int + # env var: LOTUS_PROVING_MAXPARTITIONSPERMESSAGE + #MaxPartitionsPerMessage = 0 + [Sealing] # Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time. diff --git a/node/builder_miner.go b/node/builder_miner.go index 2223d14ce7f..3bce6328103 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -116,7 +116,7 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(*miner.Miner), modules.SetupBlockProducer), Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver), Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)), - Override(new(*storage.WindowPoStScheduler), modules.WindowPostScheduler(cfg.Fees)), + Override(new(*storage.WindowPoStScheduler), modules.WindowPostScheduler(cfg.Fees, cfg.Proving)), Override(new(sectorblocks.SectorBuilder), From(new(*storage.Miner))), ), diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index cf51fb13e2a..4a90cfcc64d 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -629,6 +629,12 @@ over the worker address if this flag is set.`, Comment: `Maximum number of sector checks to run in parallel. (0 = unlimited)`, }, + { + Name: "MaxPartitionsPerMessage", + Type: "int", + + Comment: `Setting this value above the network limit has no effect`, + }, }, "Pubsub": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index b5b1fae7ed4..bc1098b7444 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -223,6 +223,20 @@ type ProvingConfig struct { ParallelCheckLimit int // todo disable builtin post + // Maximum number of partitions to prove in a single SubmitWindowPoSt messace. 0 = network limit (10 in nv16) + // + // A single partition may contain up to 2349 32GiB sectors, or 2300 64GiB sectors. + // + // The maximum number of sectors which can be proven in a single PoSt message is 25000 in network version 16, which + // means that a single message can prove at most 10 partinions + // + // In some cases when submitting PoSt messages which are recovering sectors, the default network limit may still be + // too high to fit in the block gas limit; In those cases it may be necessary to set this value to something lower + // than 10; Note that setting this value lower may result in less efficient gas use - more messages will be sent, + // to prove each deadline, resulting in more total gas use (but each message will have lower gas limit) + // + // Setting this value above the network limit has no effect + MaxPartitionsPerMessage int } type SealingConfig struct { diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 09b8b6f316c..5ac6070fe55 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -256,7 +256,7 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st } } -func WindowPostScheduler(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) { +func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) { return func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) { var ( mctx = params.MetricsCtx @@ -271,7 +271,7 @@ func WindowPostScheduler(fc config.MinerFeeConfig) func(params StorageMinerParam ctx := helpers.LifecycleCtx(mctx, lc) - fps, err := storage.NewWindowedPoStScheduler(api, fc, as, sealer, verif, sealer, j, maddr) + fps, err := storage.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, maddr) if err != nil { return nil, err } diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 916c5a905d6..d8c324865a7 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -778,6 +778,13 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net partitionsPerMsg = declMax } + // respect user config if set + if s.maxPartitionsPerMessage > 0 { + if partitionsPerMsg > s.maxPartitionsPerMessage { + partitionsPerMsg = s.maxPartitionsPerMessage + } + } + // The number of messages will be: // ceiling(number of partitions / partitions per message) batchCount := len(partitions) / partitionsPerMsg diff --git a/storage/wdpost_run_test.go b/storage/wdpost_run_test.go index 6efb3e54769..dcb5e9525ff 100644 --- a/storage/wdpost_run_test.go +++ b/storage/wdpost_run_test.go @@ -275,6 +275,105 @@ func TestWDPostDoPost(t *testing.T) { } } +// TestWDPostDoPost verifies that doPost will send the correct number of window +// PoST messages for a given number of partitions based on user config +func TestWDPostDoPostPartLimitConfig(t *testing.T) { + //stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001, + //stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01 + //stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001 + //stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001 + ctx := context.Background() + expectedMsgCount := 364 + + proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1 + postAct := tutils.NewIDAddr(t, 100) + + mockStgMinerAPI := newMockStorageMinerAPI() + + // Get the number of sectors allowed in a partition for this proof type + sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(proofType) + require.NoError(t, err) + // Work out the number of partitions that can be included in a message + // without exceeding the message sector limit + + //stm: @BLOCKCHAIN_POLICY_GET_MAX_POST_PARTITIONS_001 + partitionsPerMsg, err := policy.GetMaxPoStPartitions(network.Version13, proofType) + require.NoError(t, err) + if partitionsPerMsg > minertypes.AddressedPartitionsMax { + partitionsPerMsg = minertypes.AddressedPartitionsMax + } + + partitionCount := 4 * partitionsPerMsg + + // Assert that user config is less than network limit + userPartLimit := 33 + lastMsgParts := 21 + require.Greater(t, partitionCount, userPartLimit) + + // Assert that we consts are correct + require.Equal(t, (expectedMsgCount-1)*userPartLimit+lastMsgParts, 4*partitionsPerMsg) + + var partitions []api.Partition + for p := 0; p < partitionCount; p++ { + sectors := bitfield.New() + for s := uint64(0); s < sectorsPerPartition; s++ { + sectors.Set(s) + } + partitions = append(partitions, api.Partition{ + AllSectors: sectors, + FaultySectors: bitfield.New(), + RecoveringSectors: bitfield.New(), + LiveSectors: sectors, + ActiveSectors: sectors, + }) + } + mockStgMinerAPI.setPartitions(partitions) + + // Run window PoST + scheduler := &WindowPoStScheduler{ + api: mockStgMinerAPI, + prover: &mockProver{}, + verifier: &mockVerif{}, + faultTracker: &mockFaultTracker{}, + proofType: proofType, + actor: postAct, + journal: journal.NilJournal(), + addrSel: &ctladdr.AddressSelector{}, + + maxPartitionsPerMessage: userPartLimit, + } + + di := &dline.Info{ + WPoStPeriodDeadlines: minertypes.WPoStPeriodDeadlines, + WPoStProvingPeriod: minertypes.WPoStProvingPeriod, + WPoStChallengeWindow: minertypes.WPoStChallengeWindow, + WPoStChallengeLookback: minertypes.WPoStChallengeLookback, + FaultDeclarationCutoff: minertypes.FaultDeclarationCutoff, + } + ts := mockTipSet(t) + + scheduler.startGeneratePoST(ctx, ts, di, func(posts []minertypes.SubmitWindowedPoStParams, err error) { + scheduler.startSubmitPoST(ctx, ts, di, posts, func(err error) {}) + }) + + // Read the window PoST messages + for i := 0; i < expectedMsgCount; i++ { + msg := <-mockStgMinerAPI.pushedMessages + require.Equal(t, builtin.MethodsMiner.SubmitWindowedPoSt, msg.Method) + var params minertypes.SubmitWindowedPoStParams + err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + + if i == expectedMsgCount-1 { + // In the last message we only included a 21 partitions + require.Len(t, params.Partitions, lastMsgParts) + } else { + // All previous messages should include the full number of partitions + require.Len(t, params.Partitions, userPartLimit) + } + } +} + func mockTipSet(t *testing.T) *types.TipSet { minerAct := tutils.NewActorAddr(t, "miner") c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH") diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 53801e36212..8977056463c 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -30,15 +30,16 @@ import ( // WindowPoStScheduler watches the chain though the changeHandler, which in turn // turn calls the scheduler when the time arrives to do work. type WindowPoStScheduler struct { - api fullNodeFilteredAPI - feeCfg config.MinerFeeConfig - addrSel *AddressSelector - prover storage.Prover - verifier ffiwrapper.Verifier - faultTracker sectorstorage.FaultTracker - proofType abi.RegisteredPoStProof - partitionSectors uint64 - ch *changeHandler + api fullNodeFilteredAPI + feeCfg config.MinerFeeConfig + addrSel *AddressSelector + prover storage.Prover + verifier ffiwrapper.Verifier + faultTracker sectorstorage.FaultTracker + proofType abi.RegisteredPoStProof + partitionSectors uint64 + maxPartitionsPerMessage int + ch *changeHandler actor address.Address @@ -52,6 +53,7 @@ type WindowPoStScheduler struct { // NewWindowedPoStScheduler creates a new WindowPoStScheduler scheduler. func NewWindowedPoStScheduler(api fullNodeFilteredAPI, cfg config.MinerFeeConfig, + pcfg config.ProvingConfig, as *AddressSelector, sp storage.Prover, verif ffiwrapper.Verifier, @@ -64,14 +66,15 @@ func NewWindowedPoStScheduler(api fullNodeFilteredAPI, } return &WindowPoStScheduler{ - api: api, - feeCfg: cfg, - addrSel: as, - prover: sp, - verifier: verif, - faultTracker: ft, - proofType: mi.WindowPoStProofType, - partitionSectors: mi.WindowPoStPartitionSectors, + api: api, + feeCfg: cfg, + addrSel: as, + prover: sp, + verifier: verif, + faultTracker: ft, + proofType: mi.WindowPoStProofType, + partitionSectors: mi.WindowPoStPartitionSectors, + maxPartitionsPerMessage: pcfg.MaxPartitionsPerMessage, actor: actor, evtTypes: [...]journal.EventType{ From d68a60e8d9f53fee6a0251ba004fe47618f436e9 Mon Sep 17 00:00:00 2001 From: Aayush Date: Thu, 7 Jul 2022 10:52:22 -0400 Subject: [PATCH 2/2] feat: recovery: Config for maximum partition count per message --- .../en/default-lotus-miner-config.toml | 14 +- node/config/doc_gen.go | 12 +- node/config/types.go | 13 +- storage/wdpost_run.go | 136 +++++++++--------- storage/wdpost_run_test.go | 97 ++++++++++++- storage/wdpost_sched.go | 44 +++--- 6 files changed, 223 insertions(+), 93 deletions(-) diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index 276e1dd5f2c..c10a4f4768a 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -318,8 +318,18 @@ # Setting this value above the network limit has no effect # # type: int - # env var: LOTUS_PROVING_MAXPARTITIONSPERMESSAGE - #MaxPartitionsPerMessage = 0 + # env var: LOTUS_PROVING_MAXPARTITIONSPERPOSTMESSAGE + #MaxPartitionsPerPoStMessage = 0 + + # In some cases when submitting DeclareFaultsRecovered messages, + # there may be too many recoveries to fit in a BlockGasLimit. + # In those cases it may be necessary to set this value to something low (eg 1); + # Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed, + # resulting in more total gas use (but each message will have lower gas limit) + # + # type: int + # env var: LOTUS_PROVING_MAXPARTITIONSPERRECOVERYMESSAGE + #MaxPartitionsPerRecoveryMessage = 0 [Sealing] diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 4a90cfcc64d..8b8b149995f 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -630,11 +630,21 @@ over the worker address if this flag is set.`, Comment: `Maximum number of sector checks to run in parallel. (0 = unlimited)`, }, { - Name: "MaxPartitionsPerMessage", + Name: "MaxPartitionsPerPoStMessage", Type: "int", Comment: `Setting this value above the network limit has no effect`, }, + { + Name: "MaxPartitionsPerRecoveryMessage", + Type: "int", + + Comment: `In some cases when submitting DeclareFaultsRecovered messages, +there may be too many recoveries to fit in a BlockGasLimit. +In those cases it may be necessary to set this value to something low (eg 1); +Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed, +resulting in more total gas use (but each message will have lower gas limit)`, + }, }, "Pubsub": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index bc1098b7444..ecced55144c 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -222,7 +222,6 @@ type ProvingConfig struct { // Maximum number of sector checks to run in parallel. (0 = unlimited) ParallelCheckLimit int - // todo disable builtin post // Maximum number of partitions to prove in a single SubmitWindowPoSt messace. 0 = network limit (10 in nv16) // // A single partition may contain up to 2349 32GiB sectors, or 2300 64GiB sectors. @@ -236,7 +235,17 @@ type ProvingConfig struct { // to prove each deadline, resulting in more total gas use (but each message will have lower gas limit) // // Setting this value above the network limit has no effect - MaxPartitionsPerMessage int + MaxPartitionsPerPoStMessage int + + // Maximum number of partitions to declare in a single DeclareFaultsRecovered message. 0 = no limit. + + // In some cases when submitting DeclareFaultsRecovered messages, + // there may be too many recoveries to fit in a BlockGasLimit. + // In those cases it may be necessary to set this value to something low (eg 1); + // Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed, + // resulting in more total gas use (but each message will have lower gas limit) + MaxPartitionsPerRecoveryMessage int + // todo disable builtin post } type SealingConfig struct { diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index d8c324865a7..2c0a89d6c07 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -261,14 +261,15 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B // TODO: the waiting should happen in the background. Right now this // is blocking/delaying the actual generation and submission of WindowPoSts in // this deadline! -func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) { +func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([][]miner.RecoveryDeclaration, []*types.SignedMessage, error) { ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries") defer span.End() faulty := uint64(0) - params := &miner.DeclareFaultsRecoveredParams{ - Recoveries: []miner.RecoveryDeclaration{}, - } + + var batchedRecoveryDecls [][]miner.RecoveryDeclaration + batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{}) + totalRecoveries := 0 for partIdx, partition := range partitions { unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors) @@ -302,55 +303,72 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6 continue } - params.Recoveries = append(params.Recoveries, miner.RecoveryDeclaration{ + // respect user config if set + if s.maxPartitionsPerRecoveryMessage > 0 && + len(batchedRecoveryDecls[len(batchedRecoveryDecls)-1]) >= s.maxPartitionsPerRecoveryMessage { + batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{}) + } + + batchedRecoveryDecls[len(batchedRecoveryDecls)-1] = append(batchedRecoveryDecls[len(batchedRecoveryDecls)-1], miner.RecoveryDeclaration{ Deadline: dlIdx, Partition: uint64(partIdx), Sectors: recovered, }) + + totalRecoveries++ } - recoveries := params.Recoveries - if len(recoveries) == 0 { + if totalRecoveries == 0 { if faulty != 0 { log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty) } - return recoveries, nil, nil + return nil, nil, nil } - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr) - } + var msgs []*types.SignedMessage + for _, recovery := range batchedRecoveryDecls { + params := &miner.DeclareFaultsRecoveredParams{ + Recoveries: recovery, + } - msg := &types.Message{ - To: s.actor, - Method: builtin.MethodsMiner.DeclareFaultsRecovered, - Params: enc, - Value: types.NewInt(0), - } - spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} - if err := s.prepareMessage(ctx, msg, spec); err != nil { - return recoveries, nil, err - } + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + return nil, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr) + } - sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}) - if err != nil { - return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err) - } + msg := &types.Message{ + To: s.actor, + Method: builtin.MethodsMiner.DeclareFaultsRecovered, + Params: enc, + Value: types.NewInt(0), + } + spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} + if err := s.prepareMessage(ctx, msg, spec); err != nil { + return nil, nil, err + } - log.Warnw("declare faults recovered Message CID", "cid", sm.Cid()) + sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}) + if err != nil { + return nil, nil, xerrors.Errorf("pushing message to mpool: %w", err) + } - rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) - if err != nil { - return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err) + log.Warnw("declare faults recovered Message CID", "cid", sm.Cid()) + msgs = append(msgs, sm) } - if rec.Receipt.ExitCode != 0 { - return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode) + for _, msg := range msgs { + rec, err := s.api.StateWaitMsg(context.TODO(), msg.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) + if err != nil { + return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait error: %w", err) + } + + if rec.Receipt.ExitCode != 0 { + return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode) + } } - return recoveries, sm, nil + return batchedRecoveryDecls, msgs, nil } // declareFaults identifies the sectors on the specified proving deadline that @@ -464,9 +482,8 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet) } var ( - sigmsg *types.SignedMessage - recoveries []miner.RecoveryDeclaration - faults []miner.FaultDeclaration + sigmsgs []*types.SignedMessage + recoveries [][]miner.RecoveryDeclaration // optionalCid returns the CID of the message, or cid.Undef is the // message is nil. We don't need the argument (could capture the @@ -479,37 +496,28 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet) } ) - if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { + if recoveries, sigmsgs, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { // TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse log.Errorf("checking sector recoveries: %v", err) } - s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} { - j := WdPoStRecoveriesProcessedEvt{ - evtCommon: s.getEvtCommon(err), - Declarations: recoveries, - MessageCID: optionalCid(sigmsg), + // should always be true, skip journaling if not for some reason + if len(recoveries) == len(sigmsgs) { + for i, recovery := range recoveries { + // clone for function literal + recovery := recovery + msgCID := optionalCid(sigmsgs[i]) + s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} { + j := WdPoStRecoveriesProcessedEvt{ + evtCommon: s.getEvtCommon(err), + Declarations: recovery, + MessageCID: msgCID, + } + j.Error = err + return j + }) } - j.Error = err - return j - }) - - if ts.Height() > build.UpgradeIgnitionHeight { - return // FORK: declaring faults after ignition upgrade makes no sense } - - if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { - // TODO: This is also potentially really bad, but we try to post anyways - log.Errorf("checking sector faults: %v", err) - } - - s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} { - return WdPoStFaultsProcessedEvt{ - evtCommon: s.getEvtCommon(err), - Declarations: faults, - MessageCID: optionalCid(sigmsg), - } - }) }() } @@ -779,9 +787,9 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net } // respect user config if set - if s.maxPartitionsPerMessage > 0 { - if partitionsPerMsg > s.maxPartitionsPerMessage { - partitionsPerMsg = s.maxPartitionsPerMessage + if s.maxPartitionsPerPostMessage > 0 { + if partitionsPerMsg > s.maxPartitionsPerPostMessage { + partitionsPerMsg = s.maxPartitionsPerPostMessage } } diff --git a/storage/wdpost_run_test.go b/storage/wdpost_run_test.go index dcb5e9525ff..51dc5782ffc 100644 --- a/storage/wdpost_run_test.go +++ b/storage/wdpost_run_test.go @@ -275,7 +275,7 @@ func TestWDPostDoPost(t *testing.T) { } } -// TestWDPostDoPost verifies that doPost will send the correct number of window +// TestWDPostDoPostPartLimitConfig verifies that doPost will send the correct number of window // PoST messages for a given number of partitions based on user config func TestWDPostDoPostPartLimitConfig(t *testing.T) { //stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001, @@ -338,9 +338,8 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) { proofType: proofType, actor: postAct, journal: journal.NilJournal(), - addrSel: &ctladdr.AddressSelector{}, - maxPartitionsPerMessage: userPartLimit, + maxPartitionsPerPostMessage: userPartLimit, } di := &dline.Info{ @@ -374,6 +373,98 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) { } } +// TestWDPostDeclareRecoveriesPartLimitConfig verifies that declareRecoveries will send the correct number of +// DeclareFaultsRecovered messages for a given number of partitions based on user config +func TestWDPostDeclareRecoveriesPartLimitConfig(t *testing.T) { + //stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001, + //stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01 + //stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001 + //stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001 + ctx := context.Background() + + proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1 + postAct := tutils.NewIDAddr(t, 100) + + mockStgMinerAPI := newMockStorageMinerAPI() + + // Get the number of sectors allowed in a partition for this proof type + sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(proofType) + require.NoError(t, err) + + // Let's have 11/20 partitions with faulty sectors, and a config of 3 partitions per message + userPartLimit := 3 + partitionCount := 20 + faultyPartitionCount := 11 + + var partitions []api.Partition + for p := 0; p < partitionCount; p++ { + sectors := bitfield.New() + for s := uint64(0); s < sectorsPerPartition; s++ { + sectors.Set(s) + } + + partition := api.Partition{ + AllSectors: sectors, + FaultySectors: bitfield.New(), + RecoveringSectors: bitfield.New(), + LiveSectors: sectors, + ActiveSectors: sectors, + } + + if p < faultyPartitionCount { + partition.FaultySectors = sectors + } + + partitions = append(partitions, partition) + } + + mockStgMinerAPI.setPartitions(partitions) + + // Run declareRecoverios + scheduler := &WindowPoStScheduler{ + api: mockStgMinerAPI, + prover: &mockProver{}, + verifier: &mockVerif{}, + faultTracker: &mockFaultTracker{}, + proofType: proofType, + actor: postAct, + journal: journal.NilJournal(), + + maxPartitionsPerRecoveryMessage: userPartLimit, + } + + di := uint64(0) + ts := mockTipSet(t) + + expectedMsgCount := faultyPartitionCount/userPartLimit + 1 + lastMsgParts := faultyPartitionCount % userPartLimit + + go func() { + batchedRecoveries, msgs, err := scheduler.declareRecoveries(ctx, di, partitions, ts.Key()) + require.NoError(t, err, "failed to declare recoveries") + require.Equal(t, len(batchedRecoveries), len(msgs)) + require.Equal(t, expectedMsgCount, len(msgs)) + }() + + // Read the window PoST messages + for i := 0; i < expectedMsgCount; i++ { + msg := <-mockStgMinerAPI.pushedMessages + require.Equal(t, builtin.MethodsMiner.DeclareFaultsRecovered, msg.Method) + var params minertypes.DeclareFaultsRecoveredParams + err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + + if i == expectedMsgCount-1 { + // In the last message we only included a 21 partitions + require.Len(t, params.Recoveries, lastMsgParts) + } else { + // All previous messages should include the full number of partitions + require.Len(t, params.Recoveries, userPartLimit) + } + + } +} + func mockTipSet(t *testing.T) *types.TipSet { minerAct := tutils.NewActorAddr(t, "miner") c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH") diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 8977056463c..c9a791b4fd5 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -30,16 +30,17 @@ import ( // WindowPoStScheduler watches the chain though the changeHandler, which in turn // turn calls the scheduler when the time arrives to do work. type WindowPoStScheduler struct { - api fullNodeFilteredAPI - feeCfg config.MinerFeeConfig - addrSel *AddressSelector - prover storage.Prover - verifier ffiwrapper.Verifier - faultTracker sectorstorage.FaultTracker - proofType abi.RegisteredPoStProof - partitionSectors uint64 - maxPartitionsPerMessage int - ch *changeHandler + api fullNodeFilteredAPI + feeCfg config.MinerFeeConfig + addrSel *AddressSelector + prover storage.Prover + verifier ffiwrapper.Verifier + faultTracker sectorstorage.FaultTracker + proofType abi.RegisteredPoStProof + partitionSectors uint64 + maxPartitionsPerPostMessage int + maxPartitionsPerRecoveryMessage int + ch *changeHandler actor address.Address @@ -66,17 +67,18 @@ func NewWindowedPoStScheduler(api fullNodeFilteredAPI, } return &WindowPoStScheduler{ - api: api, - feeCfg: cfg, - addrSel: as, - prover: sp, - verifier: verif, - faultTracker: ft, - proofType: mi.WindowPoStProofType, - partitionSectors: mi.WindowPoStPartitionSectors, - maxPartitionsPerMessage: pcfg.MaxPartitionsPerMessage, - - actor: actor, + api: api, + feeCfg: cfg, + addrSel: as, + prover: sp, + verifier: verif, + faultTracker: ft, + proofType: mi.WindowPoStProofType, + partitionSectors: mi.WindowPoStPartitionSectors, + + maxPartitionsPerPostMessage: pcfg.MaxPartitionsPerPoStMessage, + maxPartitionsPerRecoveryMessage: pcfg.MaxPartitionsPerRecoveryMessage, + actor: actor, evtTypes: [...]journal.EventType{ evtTypeWdPoStScheduler: j.RegisterEventType("wdpost", "scheduler"), evtTypeWdPoStProofs: j.RegisterEventType("wdpost", "proofs_processed"),