Skip to content

Commit

Permalink
Merge pull request #8589 from filecoin-project/fix/sched-hang
Browse files Browse the repository at this point in the history
stores: Deduplicate parallel stat requests
  • Loading branch information
magik6k authored May 18, 2022
2 parents 33a05d3 + 086908e commit effee8c
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 1 deletion.
8 changes: 8 additions & 0 deletions extern/sector-storage/fsutil/filesize_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"path/filepath"
"syscall"
"time"

"golang.org/x/xerrors"
)
Expand All @@ -15,6 +16,8 @@ type SizeInfo struct {
// FileSize returns bytes used by a file or directory on disk
// NOTE: We care about the allocated bytes, not file or directory size
func FileSize(path string) (SizeInfo, error) {
start := time.Now()

var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
Expand All @@ -32,6 +35,11 @@ func FileSize(path string) (SizeInfo, error) {
}
return err
})

if time.Now().Sub(start) >= 3*time.Second {
log.Warnw("very slow file size check", "took", time.Now().Sub(start), "path", path)
}

if err != nil {
if os.IsNotExist(err) {
return SizeInfo{}, os.ErrNotExist
Expand Down
6 changes: 5 additions & 1 deletion extern/sector-storage/stores/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type path struct {
}

func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
start := time.Now()

stat, err := ls.Stat(p.local)
if err != nil {
return fsutil.FsStat{}, xerrors.Errorf("stat %s: %w", p.local, err)
Expand Down Expand Up @@ -155,6 +157,8 @@ func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
}
}

log.Infow("storage stat", "took", time.Now().Sub(start), "reservations", len(p.reservations))

return stat, err
}

Expand All @@ -166,7 +170,7 @@ type URLs []string

func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) {
l := &Local{
localStorage: ls,
localStorage: newCachedLocalStorage(ls),
index: index,
urls: urls,

Expand Down
131 changes: 131 additions & 0 deletions extern/sector-storage/stores/localstorage_cached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package stores

import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru"

"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
)

var StatTimeout = 5 * time.Second
var MaxDiskUsageDuration = time.Second

type cachedLocalStorage struct {
base LocalStorage

statLk sync.Mutex
stats *lru.Cache // path -> statEntry
pathDUs *lru.Cache // path -> *diskUsageEntry
}

func newCachedLocalStorage(ls LocalStorage) *cachedLocalStorage {
statCache, _ := lru.New(1024)
duCache, _ := lru.New(1024)

return &cachedLocalStorage{
base: ls,
stats: statCache,
pathDUs: duCache,
}
}

type statEntry struct {
stat fsutil.FsStat
time time.Time
}

type diskUsageEntry struct {
last diskUsageResult

usagePromise <-chan diskUsageResult
}

type diskUsageResult struct {
usage int64
time time.Time
}

func (c *cachedLocalStorage) GetStorage() (StorageConfig, error) {
return c.base.GetStorage()
}

func (c *cachedLocalStorage) SetStorage(f func(*StorageConfig)) error {
return c.base.SetStorage(f)
}

func (c *cachedLocalStorage) Stat(path string) (fsutil.FsStat, error) {
c.statLk.Lock()
defer c.statLk.Unlock()

if v, ok := c.stats.Get(path); ok && time.Now().Sub(v.(statEntry).time) < StatTimeout {
return v.(statEntry).stat, nil
}

// if we don't, get the stat
st, err := c.base.Stat(path)
if err == nil {
c.stats.Add(path, statEntry{
stat: st,
time: time.Now(),
})
}

return st, err
}

func (c *cachedLocalStorage) DiskUsage(path string) (int64, error) {
c.statLk.Lock()
defer c.statLk.Unlock()

var entry *diskUsageEntry

if v, ok := c.pathDUs.Get(path); ok {
entry = v.(*diskUsageEntry)

// if we have recent cached entry, use that
if time.Now().Sub(entry.last.time) < StatTimeout {
return entry.last.usage, nil
}
} else {
entry = new(diskUsageEntry)
c.pathDUs.Add(path, entry)
}

// start a new disk usage request; this can take a while so start a
// goroutine, and if it doesn't return quickly, return either the
// previous value, or zero - that's better than potentially blocking
// here for a long time.
if entry.usagePromise == nil {
resCh := make(chan diskUsageResult, 1)
go func() {
du, err := c.base.DiskUsage(path)
if err != nil {
log.Errorw("error getting disk usage", "path", path, "error", err)
}
resCh <- diskUsageResult{
usage: du,
time: time.Now(),
}
}()
entry.usagePromise = resCh
}

// wait for the disk usage result; if it doesn't come, fallback on
// previous usage
select {
case du := <-entry.usagePromise:
entry.usagePromise = nil
entry.last = du
case <-time.After(MaxDiskUsageDuration):
log.Warnw("getting usage is slow, falling back to previous usage",
"path", path,
"fallback", entry.last.usage,
"age", time.Now().Sub(entry.last.time))
}

return entry.last.usage, nil
}

var _ LocalStorage = &cachedLocalStorage{}
6 changes: 6 additions & 0 deletions extern/sector-storage/stores/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
}

// First make sure that no other goroutines are trying to fetch this sector;
// wait if there are any.
for {
r.fetchLk.Lock()

Expand Down Expand Up @@ -122,6 +124,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
r.fetchLk.Unlock()
}()

// Try to get the sector from local storage
paths, stores, err := r.local.AcquireSector(ctx, s, existing, allocate, pathType, op)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("local acquire error: %w", err)
Expand All @@ -148,6 +151,9 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
odt = storiface.FsOverheadFinalized
}

// If any path types weren't found in local storage, try fetching them

// First reserve storage
releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, odt)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)
Expand Down

0 comments on commit effee8c

Please sign in to comment.