Skip to content
This repository has been archived by the owner on Dec 19, 2022. It is now read-only.

Commit

Permalink
Bind PC1-PC2: Sector storage groups (#155)
Browse files Browse the repository at this point in the history
Co-authored-by: 一页素书 <[email protected]>
  • Loading branch information
diwufeiwen and ta0li authored Dec 16, 2021
1 parent d7372b4 commit fd1f463
Show file tree
Hide file tree
Showing 15 changed files with 226 additions and 79 deletions.
3 changes: 0 additions & 3 deletions api/api_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/google/uuid"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/venus-sealer/constants"
"github.com/filecoin-project/venus-sealer/sector-storage/stores"
Expand All @@ -24,8 +23,6 @@ type WorkerAPI interface {

TaskNumbers(context.Context) (string, error)

SectorExists(context.Context, types.TaskType, storage.SectorRef) (bool, error)

storiface.WorkerCalls

TaskDisable(ctx context.Context, tt types.TaskType) error
Expand Down
5 changes: 0 additions & 5 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type WorkerStruct struct {
TaskEnable func(ctx context.Context, tt types.TaskType) error `perm:"admin"`

TaskNumbers func(context.Context) (string, error) `perm:"admin"`
SectorExists func(context.Context, types.TaskType, storage.SectorRef) (bool, error) `perm:"admin"`

Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
Expand Down Expand Up @@ -131,10 +130,6 @@ func (w *WorkerStruct) TaskNumbers(ctx context.Context) (string, error) {
return w.Internal.TaskNumbers(ctx)
}

func (w *WorkerStruct) SectorExists(ctx context.Context, task types.TaskType, sector storage.SectorRef) (bool, error) {
return w.Internal.SectorExists(ctx, task, sector)
}

func (w *WorkerStruct) Remove(ctx context.Context, sector abi.SectorID) error {
return w.Internal.Remove(ctx, sector)
}
Expand Down
19 changes: 18 additions & 1 deletion app/venus-sealer/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ over time
Name: "store",
Usage: "(for init) use path for long-term storage",
},
&cli.StringSliceFlag{
Name: "groups",
Usage: "path group names",
},
&cli.StringSliceFlag{
Name: "allow-to",
Usage: "path groups allowed to pull data from this path (allow all if not specified)",
},
},
Action: func(cctx *cli.Context) error {
storageAPI, closer, err := api.GetStorageMinerAPI(cctx)
Expand Down Expand Up @@ -125,6 +133,8 @@ over time
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),
CanStore: cctx.Bool("store"),
Groups: cctx.StringSlice("groups"),
AllowTo: cctx.StringSlice("allow-to"),
}

if !(cfg.CanStore || cfg.CanSeal) {
Expand Down Expand Up @@ -259,10 +269,17 @@ var storageListCmd = &cli.Command{
if si.CanStore {
fmt.Print(color.CyanString("Store"))
}
fmt.Println("")
} else {
fmt.Print(color.HiYellowString("Use: ReadOnly"))
}
fmt.Println("")

if len(si.Groups) > 0 {
fmt.Printf("\tGroups: %s\n", strings.Join(si.Groups, ", "))
}
if len(si.AllowTo) > 0 {
fmt.Printf("\tAllowTo: %s\n", strings.Join(si.AllowTo, ", "))
}

if localPath, ok := local[s.ID]; ok {
fmt.Printf("\tLocal: %s\n", color.GreenString(localPath))
Expand Down
6 changes: 0 additions & 6 deletions app/venus-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,6 @@ var runCmd = &cli.Command{
Usage: "total number of task",
Value: 100,
},
&cli.BoolFlag{
Name: "bindP1P2",
Usage: "P1 and P2 phase tasks are bound to the same machine",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
log.Info("Starting venus worker")
Expand Down Expand Up @@ -418,7 +413,6 @@ var runCmd = &cli.Command{
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
TaskTotal: cctx.Int64("task-total"),
IsBindP1P2: cctx.Bool("bindP1P2"),
}, remote, localStore, nodeApi, nodeApi, wsts),
localStore: localStore,
ls: localStorage,
Expand Down
10 changes: 10 additions & 0 deletions app/venus-worker/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ var storageAttachCmd = &cli.Command{
Name: "store",
Usage: "(for init) use path for long-term storage",
},
&cli.StringSliceFlag{
Name: "groups",
Usage: "path group names",
},
&cli.StringSliceFlag{
Name: "allow-to",
Usage: "path groups allowed to pull data from this path (allow all if not specified)",
},
},
Action: func(cctx *cli.Context) error {
workerApi, closer, err := api.GetWorkerAPI(cctx)
Expand Down Expand Up @@ -83,6 +91,8 @@ var storageAttachCmd = &cli.Command{
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),
CanStore: cctx.Bool("store"),
Groups: cctx.StringSlice("groups"),
AllowTo: cctx.StringSlice("allow-to"),
}

if !(cfg.CanStore || cfg.CanSeal) {
Expand Down
2 changes: 0 additions & 2 deletions sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ type Worker interface {

TaskNumbers(context.Context) (string, error)

SectorExists(context.Context, types.TaskType, storage.SectorRef) (bool, error)

// Returns paths accessible to the worker
Paths(context.Context) ([]stores.StoragePath, error)

Expand Down
4 changes: 0 additions & 4 deletions sector-storage/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,6 @@ func (t *schedTestWorker) TaskNumbers(ctx context.Context) (string, error) {
return "0-0", nil
}

func (s *schedTestWorker) SectorExists(context.Context, types.TaskType, storage.SectorRef) (bool, error) {
return true, nil
}

func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return s.paths, nil
}
Expand Down
9 changes: 0 additions & 9 deletions sector-storage/selector_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ func (s *allocSelector) Ok(ctx context.Context, task types.TaskType, spt abi.Reg
return false, nil
}

// whether the task of the previous stage is on this machine
bExist, err := whnd.workerRpc.SectorExists(ctx, task, sector)
if err != nil {
return false, xerrors.Errorf("getting supported worker for same machine: %w", err)
}
if !bExist {
return false, nil
}

// Check the number of tasks
taskNum, err := whnd.workerRpc.TaskNumbers(ctx)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions sector-storage/selector_existing.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ func (s *existingSelector) Ok(ctx context.Context, task types.TaskType, spt abi.
return false, nil
}

// whether the task of the previous stage is on this machine
bExist, err := whnd.workerRpc.SectorExists(ctx, task, sector)
if err != nil {
return false, xerrors.Errorf("getting supported worker for same machine: %w", err)
}
if !bExist {
return false, nil
}

// Check the number of tasks
taskNum, err := whnd.workerRpc.TaskNumbers(ctx)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions sector-storage/selector_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ func (s *taskSelector) Ok(ctx context.Context, task types.TaskType, spt abi.Regi
}
_, supported := tasks[task]

// whether the task of the previous stage is on this machine
bExist, err := whnd.workerRpc.SectorExists(ctx, task, sector)
if err != nil {
return false, xerrors.Errorf("getting supported worker for same machine: %w", err)
}
if !bExist {
return false, nil
}

// Check the number of tasks
taskNum, err := whnd.workerRpc.TaskNumbers(ctx)
if err != nil {
Expand Down
33 changes: 33 additions & 0 deletions sector-storage/stores/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ var SkippedHeartbeatThresh = HeartbeatInterval * 5
// filesystem, local or networked / shared by multiple machines
type ID string

type Group = string

type StorageInfo struct {
ID ID
URLs []string // TODO: Support non-http transports
Expand All @@ -34,6 +36,9 @@ type StorageInfo struct {

CanSeal bool
CanStore bool

Groups []Group
AllowTo []Group
}

type HealthReport struct {
Expand Down Expand Up @@ -163,6 +168,8 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st fsutil.FsS
i.stores[si.ID].info.MaxStorage = si.MaxStorage
i.stores[si.ID].info.CanSeal = si.CanSeal
i.stores[si.ID].info.CanStore = si.CanStore
i.stores[si.ID].info.Groups = si.Groups
i.stores[si.ID].info.AllowTo = si.AllowTo

return nil
}
Expand Down Expand Up @@ -268,6 +275,8 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
storageIDs := map[ID]uint64{}
isprimary := map[ID]bool{}

allowTo := map[Group]struct{}{}

for _, pathType := range storiface.PathTypes {
if ft&pathType == 0 {
continue
Expand Down Expand Up @@ -299,6 +308,14 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
urls[k] = rl.String()
}

if allowTo != nil && len(st.info.AllowTo) > 0 {
for _, group := range st.info.AllowTo {
allowTo[group] = struct{}{}
}
} else {
allowTo = nil // allow to any
}

out = append(out, SectorStorageInfo{
ID: id,
URLs: urls,
Expand Down Expand Up @@ -341,6 +358,22 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
continue
}

if allowTo != nil {
allow := false
for _, group := range st.info.Groups {
if _, found := allowTo[group]; found {
log.Debugf("path %s in allowed group %s", st.info.ID, group)
allow = true
break
}
}

if !allow {
log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", st.info.ID, allowTo, st.info.Groups)
continue
}
}

urls := make([]string, len(st.info.URLs))
for k, u := range st.info.URLs {
rl, err := url.Parse(u)
Expand Down
Loading

0 comments on commit fd1f463

Please sign in to comment.