Skip to content

Commit

Permalink
feat: sched: Cache worker calls
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Nov 28, 2022
1 parent baa1072 commit 73fbc4b
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 42 deletions.
50 changes: 50 additions & 0 deletions lib/lazy/getonce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package lazy

import (
"context"
"sync"
)

type Lazy[T any] struct {
Get func() (T, error)

once sync.Once

val T
err error
}

func MakeLazy[T any](get func() (T, error)) *Lazy[T] {
return &Lazy[T]{
Get: get,
}
}

func (l *Lazy[T]) Val() (T, error) {
l.once.Do(func() {
l.val, l.err = l.Get()
})
return l.val, l.err
}

type LazyCtx[T any] struct {
Get func(context.Context) (T, error)

once sync.Once

val T
err error
}

func MakeLazyCtx[T any](get func(ctx context.Context) (T, error)) *LazyCtx[T] {
return &LazyCtx[T]{
Get: get,
}
}

func (l *LazyCtx[T]) Val(ctx context.Context) (T, error) {
l.once.Do(func() {
l.val, l.err = l.Get(ctx)
})
return l.val, l.err
}
16 changes: 9 additions & 7 deletions storage/sealer/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@ const mib = 1 << 20

type WorkerAction func(ctx context.Context, w Worker) error

type SchedWorker interface {
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
Paths(context.Context) ([]storiface.StoragePath, error)
Utilization() float64
}

type WorkerSelector interface {
// Ok is true if worker is acceptable for performing a task.
// If any worker is preferred for a task, other workers won't be considered for that task.
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error)
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a SchedWorker) (ok, preferred bool, err error)

Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) // true if a is preferred over b
}

type Scheduler struct {
Expand Down Expand Up @@ -81,11 +87,7 @@ type Scheduler struct {

type WorkerHandle struct {
workerRpc Worker

tasksCache map[sealtasks.TaskType]struct{}
tasksUpdate time.Time
tasksLk sync.Mutex


Info storiface.WorkerInfo

preparing *ActiveResources // use with WorkerHandle.lk
Expand Down
12 changes: 9 additions & 3 deletions storage/sealer/sched_assigner_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sealer

import (
"context"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -37,6 +38,11 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
*/

cachedWorkers := &schedWorkerCache{
Workers: sh.Workers,
cached: map[storiface.WorkerID]*cachedSchedWorker{},
}

windowsLen := len(sh.OpenWindows)
queueLen := sh.SchedQueue.Len()

Expand Down Expand Up @@ -81,7 +87,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
var havePreferred bool

for wnd, windowRequest := range sh.OpenWindows {
worker, ok := sh.Workers[windowRequest.Worker]
worker, ok := cachedWorkers.Get(windowRequest.Worker)
if !ok {
log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.Worker)
// TODO: How to move forward here?
Expand Down Expand Up @@ -143,8 +149,8 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint
}

wi := sh.Workers[wii]
wj := sh.Workers[wji]
wi, _ := cachedWorkers.Get(wii)
wj, _ := cachedWorkers.Get(wji)

rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
defer cancel()
Expand Down
22 changes: 1 addition & 21 deletions storage/sealer/sched_resources.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package sealer

import (
"context"
"sync"
"time"

"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"sync"
)

type ActiveResources struct {
Expand Down Expand Up @@ -185,20 +182,3 @@ func (wh *WorkerHandle) Utilization() float64 {

return u
}

var tasksCacheTimeout = 30 * time.Second

func (wh *WorkerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) {
wh.tasksLk.Lock()
defer wh.tasksLk.Unlock()

if wh.tasksCache == nil || time.Now().Sub(wh.tasksUpdate) > tasksCacheTimeout {
wh.tasksCache, err = wh.workerRpc.TaskTypes(ctx)
if err != nil {
return nil, err
}
wh.tasksUpdate = time.Now()
}

return wh.tasksCache, nil
}
69 changes: 69 additions & 0 deletions storage/sealer/sched_worker_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package sealer

import (
"context"
"sync"

"github.com/filecoin-project/lotus/lib/lazy"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

// schedWorkerCache caches scheduling-related calls to workers
type schedWorkerCache struct {
Workers map[storiface.WorkerID]*WorkerHandle

lk sync.Mutex
cached map[storiface.WorkerID]*cachedSchedWorker
}

func (s *schedWorkerCache) Get(id storiface.WorkerID) (*cachedSchedWorker, bool) {
s.lk.Lock()
defer s.lk.Unlock()

if _, found := s.cached[id]; !found {
if _, found := s.Workers[id]; !found {
return nil, false
}

whnd := s.Workers[id]

s.cached[id] = &cachedSchedWorker{
tt: lazy.MakeLazyCtx(whnd.workerRpc.TaskTypes),
paths: lazy.MakeLazyCtx(whnd.workerRpc.Paths),
utilization: lazy.MakeLazy(func() (float64, error) {
return whnd.Utilization(), nil
}),

Enabled: whnd.Enabled,
Info: whnd.Info,
}
}

return s.cached[id], true
}

type cachedSchedWorker struct {
tt *lazy.LazyCtx[map[sealtasks.TaskType]struct{}]
paths *lazy.LazyCtx[[]storiface.StoragePath]
utilization *lazy.Lazy[float64]

Enabled bool
Info storiface.WorkerInfo
}

func (c *cachedSchedWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) {
return c.tt.Val(ctx)
}

func (c *cachedSchedWorker) Paths(ctx context.Context) ([]storiface.StoragePath, error) {
return c.paths.Get(ctx)
}

func (c *cachedSchedWorker) Utilization() float64 {
// can't error
v, _ := c.utilization.Val()
return v
}

var _ SchedWorker = &cachedSchedWorker{}
6 changes: 3 additions & 3 deletions storage/sealer/selector_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func newAllocSelector(index paths.SectorIndex, alloc storiface.SectorFileType, p
}
}

func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
Expand All @@ -35,7 +35,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
return requested == storiface.FTNone, false, nil
}

func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

Expand Down
6 changes: 3 additions & 3 deletions storage/sealer/selector_existing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newExistingSelector(index paths.SectorIndex, sector abi.SectorID, alloc sto
}
}

func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
Expand All @@ -37,7 +37,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
return requested == storiface.FTNone, false, nil
}

func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

Expand Down
6 changes: 3 additions & 3 deletions storage/sealer/selector_move.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newMoveSelector(index paths.SectorIndex, sector abi.SectorID, alloc storifa
}
}

func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
Expand All @@ -39,7 +39,7 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return (ok && s.allowRemote) || pref, pref, nil
}

func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

Expand Down
4 changes: 2 additions & 2 deletions storage/sealer/selector_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func newTaskSelector() *taskSelector {
return &taskSelector{}
}

func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
Expand All @@ -29,7 +29,7 @@ func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return supported, false, nil
}

func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b SchedWorker) (bool, error) {
atasks, err := a.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
Expand Down

0 comments on commit 73fbc4b

Please sign in to comment.