Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change storage schedule #9689

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/wallet/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log/v2"
ledgerfil "github.com/whyrusleeping/ledger-filecoin-go"
ledgerfil "github.com/zondax/ledger-filecoin-go"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/urfave/cli/v2"
ledgerfil "github.com/whyrusleeping/ledger-filecoin-go"
ledgerfil "github.com/zondax/ledger-filecoin-go"

"github.com/filecoin-project/lotus/chain/types"
ledgerwallet "github.com/filecoin-project/lotus/chain/wallet/ledger"
Expand Down
30 changes: 30 additions & 0 deletions extern/sector-storage/ffiwrapper/sealer_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"math/bits"
"os"
"runtime"
"strings"
"time"

"github.com/filecoin-project/go-state-types/proof"

Expand Down Expand Up @@ -746,6 +748,34 @@ func (sb *Sealer) SealPreCommit2(ctx context.Context, sector storage.SectorRef,
}
}

//
// 写调度优化
// 说明:
// 从原来的完成全部过程后传输回miner存储改进为在P2将sector传输到miner与worker共同后端
// 减掉最后传输回miner的步骤
//

dst := strings.Replace(paths.Sealed, "sealed", "sealed_bs", 1)
start := time.Now()
multiwrite := false
if multiwrite {
// 将sector从sealed转存到sealed_bs中
log.Infof("sector %d: starting mulitwrite.", sector.ID.Number)
// 调节参数化设置
err := MultiWrite(paths.Sealed, dst, 1024, 32)
if err != nil {
log.Errorf("multiWrite err: %+v", err)
}
} else {
log.Infof("sector %d: starting simplewrite.", sector.ID.Number)
write, err := SimpleWrite(paths.Sealed, dst)
if err != nil {
log.Errorf("simplewrite err: %v, %v", err, write)
}
}
t1 := time.Now()
log.Infof("sector %d: Write backend completed, Spend time: [%v]", sector.ID.Number, t1.Sub(start))

return storage.SectorCids{
Unsealed: unsealedCID,
Sealed: sealedCID,
Expand Down
238 changes: 238 additions & 0 deletions extern/sector-storage/ffiwrapper/writebackend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package ffiwrapper

import (
"errors"
"fmt"
"io"
"os"
"sync"
"syscall"
"time"
)

type task struct {
buf []byte
offset int64
wg *sync.WaitGroup
}

type processSize struct {
len int64
pos int64
}

func calculatePsize(srcSize, blockSize, parallel int64) ([]processSize, error) {
if srcSize <= 0 || blockSize <= 0 || parallel <= 0 {
return nil, errors.New("invalid argument")
}
blockCount := srcSize / blockSize
if blockCount == 0 {
return []processSize{{len: srcSize, pos: 0}}, nil
}

var psizes []processSize
parallelBlockCount := blockCount / parallel
blockCountMod := blockCount % parallel
pos := int64(0)
for i := int64(0); i < parallel; i++ {
perBlockCount := parallelBlockCount
if i < blockCountMod {
perBlockCount++
}
psize := processSize{
len: perBlockCount * blockSize,
pos: pos,
}
pos += psize.len

if psize.len != 0 {
psizes = append(psizes, psize)
}
}
if len(psizes) > 0 {
lastPsize := psizes[len(psizes)-1]
if srcSize%blockSize != 0 {
lastPsize.len = lastPsize.len + srcSize%blockSize
}
psizes[len(psizes)-1] = lastPsize
}
return psizes, nil
}

// multiple parallel write from src to dst
// blockSize, copy size of each io
// parallel, num of parallel
func MultiWrite(src string, dst string, blockSize int64, parallel int64) error {
// param check
srcInfo, err := os.Stat(src)
if nil != err {
return err
}
srcSize := srcInfo.Size()
if srcSize == 0 {
return fmt.Errorf("src file '%s' size is 0", src)
}

if _, err := os.Stat(dst); nil == err {
return errors.New("dst file is exist")
} else if nil != err && !os.IsNotExist(err) {
return err
}
if parallel < 0 {
return errors.New("parallel must greater than 0")
}
if blockSize < 0 {
return errors.New("blockSize must greater than 0")
}

fsrc, err := os.Open(src)
if err != nil {
return err
}
defer fsrc.Close()

fdst, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC|syscall.O_DIRECT, 0666)
if nil != err {
return err
}
defer fdst.Close()

// first, truncate dst with src file size
if err = fdst.Truncate(srcSize); err != nil {
log.Errorf("Truncate file %s failed, err %+v", dst, err)
return err
}

psizes, err := calculatePsize(srcSize, blockSize, parallel)
if err != nil {
return err
}

// for read worker
workWg := sync.WaitGroup{}
// for write func
writeWg := sync.WaitGroup{}
// a chan for write op
writeCh := make(chan *task, parallel)
// a chan for stopping write scheduler
stopCh := make(chan bool)
errCh := make(chan error)

// write scheduler
go func() {
writeLimit := make(chan struct{}, parallel)
for {
writeLimit <- struct{}{}
select {
case t := <-writeCh:
go func() {
err := t.write(fdst)
if err != nil {
errCh <- err
}
<-writeLimit
}()
case <-stopCh:
return
}
}
}()

// read file worker
worker := func(len, pos int64) error {
defer workWg.Done()

// size of have read
rsize := int64(0)
roffset := pos
for {
if len == rsize {
break
}
buf := make([]byte, blockSize)
rlen, err := fsrc.ReadAt(buf, roffset)
if err == io.EOF {
buf = buf[:rlen]
} else if err != nil {
log.Errorf("read file %s err: %+v", fsrc.Name(), err)
return err
}
t := &task{
buf: buf,
offset: roffset,
wg: &writeWg,
}
writeWg.Add(1)
writeCh <- t

rsize += int64(rlen)
roffset += int64(rlen)
}
return nil
}

// have wrote size
for _, p := range psizes {
workWg.Add(1)
go func(len, pos int64) {
err := worker(len, pos)
if err != nil {
errCh <- err
}
}(p.len, p.pos)
}

doneCh := make(chan bool)
go func() {
workWg.Wait()
writeWg.Wait()
stopCh <- true
doneCh <- true
}()
var lastErr error
LOOP:
for {
select {
case lastErr = <-errCh:
stopCh <- true
break LOOP
case <-doneCh:
break LOOP
}
}
return lastErr
}

// write file
func (t *task) write(f *os.File) (err error) {
defer t.wg.Done()
// retry 3 times, if WriteAt return err
for i := 0; i < 3; i++ {
if i != 0 {
log.Errorf("sleep 10s, and retry writing %d time", i)
time.Sleep(10 * time.Second)
}
_, err = f.WriteAt(t.buf, t.offset)
if err == nil {
break
}
log.Errorf("write file %s failed, err: %+v", f.Name(), err)
}
return err
}

// simplewrite
func SimpleWrite(src string, dst string) (int64, error) {
srcSector, err := os.Open(src)
if err != nil {
return 0, err
}
dstSector, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return 0, err
}
defer srcSector.Close()
defer dstSector.Close()

return io.Copy(dstSector, srcSector)
}
19 changes: 13 additions & 6 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,13 +583,16 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}

unsealed := storiface.FTUnsealed
sealed := storiface.FTSealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
// unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
unsealedAndsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed|storiface.FTSealed, 0, false)
if err != nil {
return xerrors.Errorf("finding unsealed sector: %w", err)
}

if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
// if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
if len(unsealedAndsealedStores) == 0 {
unsealed = storiface.FTNone
}
}
Expand All @@ -609,7 +612,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
}

selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)

err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
m.schedFetch(sector, storiface.FTCache|unsealed, pathType, storiface.AcquireMove),
Expand All @@ -621,18 +624,22 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
return err
}

fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
fetchSel := newAllocSelector(m.index, storiface.FTCache, storiface.PathStorage)
// fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
moveUnsealed := unsealed
{
if len(keepUnsealed) == 0 {
moveUnsealed = storiface.FTNone
}
}
moveSealed := sealed

err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
// m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
m.schedFetch(sector, storiface.FTCache|moveUnsealed|moveSealed, storiface.PathStorage, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed))
// _, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed))
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|moveUnsealed))
return err
})
if err != nil {
Expand Down
Loading