Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wdpost: disabled post worker handling #10394

Merged
merged 5 commits into from
Mar 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions itests/kit/ensemble.go
Original file line number Diff line number Diff line change
@@ -336,6 +336,8 @@ func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...Node
MinerNode: minerNode,
RemoteListener: rl,
options: options,

Stop: func(ctx context.Context) error { return nil },
}

n.inactive.workers = append(n.inactive.workers, worker)
6 changes: 6 additions & 0 deletions itests/kit/rpc.go
Original file line number Diff line number Diff line change
@@ -96,6 +96,12 @@ func workerRpc(t *testing.T, m *TestWorker) *TestWorker {
require.NoError(t, err)
t.Cleanup(stop)

m.Stop = func(ctx context.Context) error {
srv.Close()
srv.CloseClientConnections()
return nil
}

m.ListenAddr, m.Worker = maddr, cl
return m
}
53 changes: 41 additions & 12 deletions itests/worker_test.go
Original file line number Diff line number Diff line change
@@ -366,7 +366,7 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {

sectors := 2 * 48 * 2

client, miner, _, ens := kit.EnsembleWorker(t,
client, miner, _, _ := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ThroughRPC(),
@@ -378,17 +378,8 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)

bm := ens.InterconnectAll().BeginMiningMustPost(2 * time.Millisecond)[0]

di = di.NextNotElapsed()

t.Log("Running one proving period")
waitUntil := di.Open + di.WPoStChallengeWindow*2 - 2
client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))

t.Log("Waiting for post message")
bm.Stop()

tryDl := func(dl uint64) {
p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK)
require.NoError(t, err)
@@ -398,10 +389,48 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
tryDl(0)
tryDl(40)
tryDl(di.Index + 4)
}

lastPending, err := client.MpoolPending(ctx, types.EmptyTSK)
func TestWindowPostWorkerDisconnected(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_ = logging.SetLogLevel("storageminer", "INFO")

sectors := 2 * 48 * 2

_, miner, badWorker, ens := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}))

var goodWorker kit.TestWorker
ens.Worker(miner, &goodWorker, kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}), kit.ThroughRPC()).Start()

// wait for all workers
require.Eventually(t, func() bool {
w, err := miner.WorkerStats(ctx)
require.NoError(t, err)
return len(w) == 3 // 2 post + 1 miner-builtin
}, 10*time.Second, 100*time.Millisecond)

tryDl := func(dl uint64) {
p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK)
require.NoError(t, err)
require.Len(t, p, 1)
require.Equal(t, dl, p[0].Deadline)
}
tryDl(0) // this will run on the not-yet-bad badWorker

err := badWorker.Stop(ctx)
require.NoError(t, err)
require.Len(t, lastPending, 0)

tryDl(10) // will fail on the badWorker, then should retry on the goodWorker

time.Sleep(15 * time.Second)

tryDl(40) // after HeartbeatInterval, the badWorker should be marked as disabled
}

func TestSchedulerRemoveRequest(t *testing.T) {
2 changes: 1 addition & 1 deletion storage/sealer/manager_post.go
Original file line number Diff line number Diff line change
@@ -196,7 +196,7 @@ func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
skipped = append(skipped, sk...)

if err != nil {
retErr = multierr.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", partIdx, err))
retErr = multierr.Append(retErr, xerrors.Errorf("partitionIndex:%d err:%+v", partIdx, err))
}
flk.Unlock()
}
38 changes: 31 additions & 7 deletions storage/sealer/sched_post.go
Original file line number Diff line number Diff line change
@@ -2,12 +2,15 @@ package sealer

import (
"context"
"errors"
"math/rand"
"sync"
"time"

"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/storage/paths"
@@ -102,15 +105,31 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
}
}()

selected := candidates[0]
worker := ps.workers[selected.id]
var rpcErrs error

return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()
for i, selected := range candidates {
worker := ps.workers[selected.id]

return work(ctx, worker.workerRpc)
})
err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()

return work(ctx, worker.workerRpc)
})
if err == nil {
return nil
}

// if the error is RPCConnectionError, try another worker, if not, return the error
if !errors.As(err, new(*jsonrpc.RPCConnectionError)) {
return err
}

log.Warnw("worker RPC connection error, will retry with another candidate if possible", "error", err, "worker", selected.id, "candidate", i, "candidates", len(candidates))
rpcErrs = multierror.Append(rpcErrs, err)
}

return xerrors.Errorf("got RPC errors from all workers: %w", rpcErrs)
}

type candidateWorker struct {
@@ -124,6 +143,11 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
for wid, wr := range ps.workers {
needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType)

if !wr.Enabled {
log.Debugf("sched: not scheduling on PoSt-worker %s, worker disabled", wid)
continue
}

if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
continue
}