Skip to content

Commit

Permalink
Merge pull request #4779 from filecoin-project/feat/work-track-hostname
Browse files Browse the repository at this point in the history
storage: Track worker hostnames with work
  • Loading branch information
magik6k authored Nov 10, 2020
2 parents 2ae0edc + 5caa277 commit 18aa97f
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 18 deletions.
17 changes: 13 additions & 4 deletions cmd/lotus-storage-miner/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,22 +187,31 @@ var sealingJobsCmd = &cli.Command{

for _, l := range lines {
state := "running"
if l.RunWait > 0 {
switch {
case l.RunWait > 0:
state = fmt.Sprintf("assigned(%d)", l.RunWait-1)
}
if l.RunWait == -1 {
case l.RunWait == storiface.RWRetDone:
state = "ret-done"
case l.RunWait == storiface.RWReturned:
state = "returned"
case l.RunWait == storiface.RWRetWait:
state = "ret-wait"
}
dur := "n/a"
if !l.Start.IsZero() {
dur = time.Now().Sub(l.Start).Truncate(time.Millisecond * 100).String()
}

hostname, ok := workerHostnames[l.wid]
if !ok {
hostname = l.Hostname
}

_, _ = fmt.Fprintf(tw, "%s\t%d\t%s\t%s\t%s\t%s\t%s\n",
hex.EncodeToString(l.ID.ID[10:]),
l.Sector.Number,
hex.EncodeToString(l.wid[5:]),
workerHostnames[l.wid],
hostname,
l.Task.Short(),
state,
dur)
Expand Down
84 changes: 83 additions & 1 deletion extern/sector-storage/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
selector := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathSealing)

err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, wk)(w.SealPreCommit1(ctx, sector, ticket, pieces))
err := m.startWork(ctx, w, wk)(w.SealPreCommit1(ctx, sector, ticket, pieces))
if err != nil {
return err
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true)

err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, wk)(w.SealPreCommit2(ctx, sector, phase1Out))
err := m.startWork(ctx, w, wk)(w.SealPreCommit2(ctx, sector, phase1Out))
if err != nil {
return err
}
Expand Down Expand Up @@ -480,7 +480,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)

err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, wk)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
err := m.startWork(ctx, w, wk)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
if err != nil {
return err
}
Expand Down Expand Up @@ -520,7 +520,7 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
selector := newTaskSelector()

err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, wk)(w.SealCommit2(ctx, sector, phase1Out))
err := m.startWork(ctx, w, wk)(w.SealCommit2(ctx, sector, phase1Out))
if err != nil {
return err
}
Expand Down
33 changes: 30 additions & 3 deletions extern/sector-storage/manager_calltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"os"
"time"

"golang.org/x/xerrors"

Expand Down Expand Up @@ -41,6 +42,9 @@ type WorkState struct {

WorkerCall storiface.CallID // Set when entering wsRunning
WorkError string // Status = wsDone, set when failed to start work

WorkerHostname string // hostname of last worker handling this job
StartTime int64 // unix seconds
}

func newWorkID(method sealtasks.TaskType, params ...interface{}) (WorkID, error) {
Expand Down Expand Up @@ -85,8 +89,7 @@ func (m *Manager) setupWorkTracker() {
log.Errorf("cleannig up work state for %s", wid)
}
case wsDone:
// realistically this shouldn't ever happen as we return results
// immediately after getting them
// can happen after restart, abandoning work, and another restart
log.Warnf("dropping done work, no result, wid %s", wid)

if err := m.work.Get(wid).End(); err != nil {
Expand Down Expand Up @@ -167,8 +170,16 @@ func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params
}, nil
}

func (m *Manager) startWork(ctx context.Context, wk WorkID) func(callID storiface.CallID, err error) error {
func (m *Manager) startWork(ctx context.Context, w Worker, wk WorkID) func(callID storiface.CallID, err error) error {
return func(callID storiface.CallID, err error) error {
var hostname string
info, ierr := w.Info(ctx)
if ierr != nil {
hostname = "[err]"
} else {
hostname = info.Hostname
}

m.workLk.Lock()
defer m.workLk.Unlock()

Expand All @@ -194,6 +205,8 @@ func (m *Manager) startWork(ctx context.Context, wk WorkID) func(callID storifac
ws.Status = wsRunning
}
ws.WorkerCall = callID
ws.WorkerHostname = hostname
ws.StartTime = time.Now().Unix()
return nil
})
if err != nil {
Expand Down Expand Up @@ -379,6 +392,20 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri

m.results[wid] = res

err = m.work.Get(wid).Mutate(func(ws *WorkState) error {
ws.Status = wsDone
return nil
})
if err != nil {
// in the unlikely case:
// * manager has restarted, and we're still tracking this work, and
// * the work is abandoned (storage-fsm doesn't do a matching call on the sector), and
// * the call is returned from the worker, and
// * this errors
// the user will get jobs stuck in ret-wait state
log.Errorf("marking work as done: %+v", err)
}

_, found := m.waitRes[wid]
if found {
close(m.waitRes[wid])
Expand Down
24 changes: 19 additions & 5 deletions extern/sector-storage/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,26 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob {
continue
}

var ws WorkState
if err := m.work.Get(work).Get(&ws); err != nil {
log.Errorf("WorkerJobs: get work %s: %+v", work, err)
}

wait := storiface.RWRetWait
if _, ok := m.results[work]; ok {
wait = storiface.RWReturned
}
if ws.Status == wsDone {
wait = storiface.RWRetDone
}

out[uuid.UUID{}] = append(out[uuid.UUID{}], storiface.WorkerJob{
ID: id,
Sector: id.Sector,
Task: work.Method,
RunWait: -1,
Start: time.Time{},
ID: id,
Sector: id.Sector,
Task: work.Method,
RunWait: wait,
Start: time.Unix(ws.StartTime, 0),
Hostname: ws.WorkerHostname,
})
}

Expand Down
15 changes: 14 additions & 1 deletion extern/sector-storage/storiface/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,26 @@ type WorkerStats struct {
CpuUse uint64 // nolint
}

const (
RWRetWait = -1
RWReturned = -2
RWRetDone = -3
)

type WorkerJob struct {
ID CallID
Sector abi.SectorID
Task sealtasks.TaskType

RunWait int // -1 - ret-wait, 0 - running, 1+ - assigned
// 1+ - assigned
// 0 - running
// -1 - ret-wait
// -2 - returned
// -3 - ret-done
RunWait int
Start time.Time

Hostname string `json:",omitempty"` // optional, set for ret-wait jobs
}

type CallID struct {
Expand Down

0 comments on commit 18aa97f

Please sign in to comment.