Skip to content
This repository has been archived by the owner on Dec 19, 2022. It is now read-only.

Bind PC1-PC2: Sector storage groups #155

Merged
merged 1 commit into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions api/api_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/google/uuid"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/venus-sealer/constants"
"github.com/filecoin-project/venus-sealer/sector-storage/stores"
Expand All @@ -24,8 +23,6 @@ type WorkerAPI interface {

TaskNumbers(context.Context) (string, error)

SectorExists(context.Context, types.TaskType, storage.SectorRef) (bool, error)

storiface.WorkerCalls

TaskDisable(ctx context.Context, tt types.TaskType) error
Expand Down
5 changes: 0 additions & 5 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type WorkerStruct struct {
TaskEnable func(ctx context.Context, tt types.TaskType) error `perm:"admin"`

TaskNumbers func(context.Context) (string, error) `perm:"admin"`
SectorExists func(context.Context, types.TaskType, storage.SectorRef) (bool, error) `perm:"admin"`

Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
Expand Down Expand Up @@ -131,10 +130,6 @@ func (w *WorkerStruct) TaskNumbers(ctx context.Context) (string, error) {
return w.Internal.TaskNumbers(ctx)
}

func (w *WorkerStruct) SectorExists(ctx context.Context, task types.TaskType, sector storage.SectorRef) (bool, error) {
return w.Internal.SectorExists(ctx, task, sector)
}

func (w *WorkerStruct) Remove(ctx context.Context, sector abi.SectorID) error {
return w.Internal.Remove(ctx, sector)
}
Expand Down
19 changes: 18 additions & 1 deletion app/venus-sealer/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ over time
Name: "store",
Usage: "(for init) use path for long-term storage",
},
&cli.StringSliceFlag{
Name: "groups",
Usage: "path group names",
},
&cli.StringSliceFlag{
Name: "allow-to",
Usage: "path groups allowed to pull data from this path (allow all if not specified)",
},
},
Action: func(cctx *cli.Context) error {
storageAPI, closer, err := api.GetStorageMinerAPI(cctx)
Expand Down Expand Up @@ -125,6 +133,8 @@ over time
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),
CanStore: cctx.Bool("store"),
Groups: cctx.StringSlice("groups"),
AllowTo: cctx.StringSlice("allow-to"),
}

if !(cfg.CanStore || cfg.CanSeal) {
Expand Down Expand Up @@ -259,10 +269,17 @@ var storageListCmd = &cli.Command{
if si.CanStore {
fmt.Print(color.CyanString("Store"))
}
fmt.Println("")
} else {
fmt.Print(color.HiYellowString("Use: ReadOnly"))
}
fmt.Println("")

if len(si.Groups) > 0 {
fmt.Printf("\tGroups: %s\n", strings.Join(si.Groups, ", "))
}
if len(si.AllowTo) > 0 {
fmt.Printf("\tAllowTo: %s\n", strings.Join(si.AllowTo, ", "))
}

if localPath, ok := local[s.ID]; ok {
fmt.Printf("\tLocal: %s\n", color.GreenString(localPath))
Expand Down
6 changes: 0 additions & 6 deletions app/venus-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,6 @@ var runCmd = &cli.Command{
Usage: "total number of task",
Value: 100,
},
&cli.BoolFlag{
Name: "bindP1P2",
Usage: "P1 and P2 phase tasks are bound to the same machine",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
log.Info("Starting venus worker")
Expand Down Expand Up @@ -418,7 +413,6 @@ var runCmd = &cli.Command{
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
TaskTotal: cctx.Int64("task-total"),
IsBindP1P2: cctx.Bool("bindP1P2"),
}, remote, localStore, nodeApi, nodeApi, wsts),
localStore: localStore,
ls: localStorage,
Expand Down
10 changes: 10 additions & 0 deletions app/venus-worker/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ var storageAttachCmd = &cli.Command{
Name: "store",
Usage: "(for init) use path for long-term storage",
},
&cli.StringSliceFlag{
Name: "groups",
Usage: "path group names",
},
&cli.StringSliceFlag{
Name: "allow-to",
Usage: "path groups allowed to pull data from this path (allow all if not specified)",
},
},
Action: func(cctx *cli.Context) error {
workerApi, closer, err := api.GetWorkerAPI(cctx)
Expand Down Expand Up @@ -83,6 +91,8 @@ var storageAttachCmd = &cli.Command{
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),
CanStore: cctx.Bool("store"),
Groups: cctx.StringSlice("groups"),
AllowTo: cctx.StringSlice("allow-to"),
}

if !(cfg.CanStore || cfg.CanSeal) {
Expand Down
2 changes: 0 additions & 2 deletions sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ type Worker interface {

TaskNumbers(context.Context) (string, error)

SectorExists(context.Context, types.TaskType, storage.SectorRef) (bool, error)

// Returns paths accessible to the worker
Paths(context.Context) ([]stores.StoragePath, error)

Expand Down
4 changes: 0 additions & 4 deletions sector-storage/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,6 @@ func (t *schedTestWorker) TaskNumbers(ctx context.Context) (string, error) {
return "0-0", nil
}

func (s *schedTestWorker) SectorExists(context.Context, types.TaskType, storage.SectorRef) (bool, error) {
return true, nil
}

func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return s.paths, nil
}
Expand Down
9 changes: 0 additions & 9 deletions sector-storage/selector_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ func (s *allocSelector) Ok(ctx context.Context, task types.TaskType, spt abi.Reg
return false, nil
}

// whether the task of the previous stage is on this machine
bExist, err := whnd.workerRpc.SectorExists(ctx, task, sector)
if err != nil {
return false, xerrors.Errorf("getting supported worker for same machine: %w", err)
}
if !bExist {
return false, nil
}

// Check the number of tasks
taskNum, err := whnd.workerRpc.TaskNumbers(ctx)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions sector-storage/selector_existing.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ func (s *existingSelector) Ok(ctx context.Context, task types.TaskType, spt abi.
return false, nil
}

// whether the task of the previous stage is on this machine
bExist, err := whnd.workerRpc.SectorExists(ctx, task, sector)
if err != nil {
return false, xerrors.Errorf("getting supported worker for same machine: %w", err)
}
if !bExist {
return false, nil
}

// Check the number of tasks
taskNum, err := whnd.workerRpc.TaskNumbers(ctx)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions sector-storage/selector_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ func (s *taskSelector) Ok(ctx context.Context, task types.TaskType, spt abi.Regi
}
_, supported := tasks[task]

// whether the task of the previous stage is on this machine
bExist, err := whnd.workerRpc.SectorExists(ctx, task, sector)
if err != nil {
return false, xerrors.Errorf("getting supported worker for same machine: %w", err)
}
if !bExist {
return false, nil
}

// Check the number of tasks
taskNum, err := whnd.workerRpc.TaskNumbers(ctx)
if err != nil {
Expand Down
33 changes: 33 additions & 0 deletions sector-storage/stores/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ var SkippedHeartbeatThresh = HeartbeatInterval * 5
// filesystem, local or networked / shared by multiple machines
type ID string

type Group = string

type StorageInfo struct {
ID ID
URLs []string // TODO: Support non-http transports
Expand All @@ -34,6 +36,9 @@ type StorageInfo struct {

CanSeal bool
CanStore bool

Groups []Group
AllowTo []Group
}

type HealthReport struct {
Expand Down Expand Up @@ -163,6 +168,8 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st fsutil.FsS
i.stores[si.ID].info.MaxStorage = si.MaxStorage
i.stores[si.ID].info.CanSeal = si.CanSeal
i.stores[si.ID].info.CanStore = si.CanStore
i.stores[si.ID].info.Groups = si.Groups
i.stores[si.ID].info.AllowTo = si.AllowTo

return nil
}
Expand Down Expand Up @@ -268,6 +275,8 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
storageIDs := map[ID]uint64{}
isprimary := map[ID]bool{}

allowTo := map[Group]struct{}{}

for _, pathType := range storiface.PathTypes {
if ft&pathType == 0 {
continue
Expand Down Expand Up @@ -299,6 +308,14 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
urls[k] = rl.String()
}

if allowTo != nil && len(st.info.AllowTo) > 0 {
for _, group := range st.info.AllowTo {
allowTo[group] = struct{}{}
}
} else {
allowTo = nil // allow to any
}

out = append(out, SectorStorageInfo{
ID: id,
URLs: urls,
Expand Down Expand Up @@ -341,6 +358,22 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
continue
}

if allowTo != nil {
allow := false
for _, group := range st.info.Groups {
if _, found := allowTo[group]; found {
log.Debugf("path %s in allowed group %s", st.info.ID, group)
allow = true
break
}
}

if !allow {
log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", st.info.ID, allowTo, st.info.Groups)
continue
}
}

urls := make([]string, len(st.info.URLs))
for k, u := range st.info.URLs {
rl, err := url.Parse(u)
Expand Down
Loading