Skip to content

Commit

Permalink
Merge pull request #10913 from filecoin-project/feat/piecereader-perf
Browse files Browse the repository at this point in the history
feat: piecereader: Allow parallel access
  • Loading branch information
magik6k authored May 30, 2023
2 parents 3c67d0f + 4b5a665 commit 96fa081
Show file tree
Hide file tree
Showing 15 changed files with 362 additions and 91 deletions.
2 changes: 1 addition & 1 deletion cmd/lotus-shed/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ fr32 padding is removed from the output.`,
return xerrors.Errorf("getting reader: %w", err)
}

rd, err := readStarter(0)
rd, err := readStarter(0, storiface.PaddedByteIndex(length))
if err != nil {
return xerrors.Errorf("starting reader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion itests/deals_partial_retrieval_dm-level_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestDMLevelPartialRetrieval(t *testing.T) {
ctx := context.Background()

kit.QuietMiningLogs()
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MockProofs())
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC())
dh := kit.NewDealHarness(t, client, miner, miner)
ens.InterconnectAll().BeginMining(50 * time.Millisecond)

Expand Down
44 changes: 44 additions & 0 deletions lib/readerutil/readerutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package readerutil

import (
"io"
"os"
)

// NewReadSeekerFromReaderAt returns a new io.ReadSeeker from a io.ReaderAt.
// The returned io.ReadSeeker will read from the io.ReaderAt starting at the
// given base offset.
func NewReadSeekerFromReaderAt(readerAt io.ReaderAt, base int64) io.ReadSeeker {
return &readSeekerFromReaderAt{
readerAt: readerAt,
base: base,
pos: 0,
}
}

type readSeekerFromReaderAt struct {
readerAt io.ReaderAt
base int64
pos int64
}

func (rs *readSeekerFromReaderAt) Read(p []byte) (n int, err error) {
n, err = rs.readerAt.ReadAt(p, rs.pos+rs.base)
rs.pos += int64(n)
return n, err
}

func (rs *readSeekerFromReaderAt) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
rs.pos = offset
case io.SeekCurrent:
rs.pos += offset
case io.SeekEnd:
return 0, io.ErrUnexpectedEOF
default:
return 0, os.ErrInvalid
}

return rs.pos, nil
}
44 changes: 42 additions & 2 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ var (

// vm execution
ExecutionLane, _ = tag.NewKey("lane")

// piecereader
PRReadType, _ = tag.NewKey("pr_type") // seq / rand
PRReadSize, _ = tag.NewKey("pr_size") // small / big
)

// Measures
Expand Down Expand Up @@ -153,15 +157,22 @@ var (
SchedCycleOpenWindows = stats.Int64("sched/assigner_cycle_open_window", "Number of open windows in scheduling cycles", stats.UnitDimensionless)
SchedCycleQueueSize = stats.Int64("sched/assigner_cycle_task_queue_entry", "Number of task queue entries in scheduling cycles", stats.UnitDimensionless)

DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless)
DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes)
DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless)
DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes)

DagStorePRBytesDiscarded = stats.Int64("dagstore/pr_discarded_bytes", "PieceReader discarded bytes", stats.UnitBytes)
DagStorePRDiscardCount = stats.Int64("dagstore/pr_discard_count", "PieceReader discard count", stats.UnitDimensionless)
DagStorePRSeekBackCount = stats.Int64("dagstore/pr_seek_back_count", "PieceReader seek back count", stats.UnitDimensionless)
DagStorePRSeekForwardCount = stats.Int64("dagstore/pr_seek_forward_count", "PieceReader seek forward count", stats.UnitDimensionless)
DagStorePRSeekBackBytes = stats.Int64("dagstore/pr_seek_back_bytes", "PieceReader seek back bytes", stats.UnitBytes)
DagStorePRSeekForwardBytes = stats.Int64("dagstore/pr_seek_forward_bytes", "PieceReader seek forward bytes", stats.UnitBytes)

DagStorePRAtHitBytes = stats.Int64("dagstore/pr_at_hit_bytes", "PieceReader ReadAt bytes from cache", stats.UnitBytes)
DagStorePRAtHitCount = stats.Int64("dagstore/pr_at_hit_count", "PieceReader ReadAt from cache hits", stats.UnitDimensionless)
DagStorePRAtCacheFillCount = stats.Int64("dagstore/pr_at_cache_fill_count", "PieceReader ReadAt full cache fill count", stats.UnitDimensionless)
DagStorePRAtReadBytes = stats.Int64("dagstore/pr_at_read_bytes", "PieceReader ReadAt bytes read from source", stats.UnitBytes) // PRReadSize tag
DagStorePRAtReadCount = stats.Int64("dagstore/pr_at_read_count", "PieceReader ReadAt reads from source", stats.UnitDimensionless) // PRReadSize tag

// splitstore
SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless)
SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds)
Expand Down Expand Up @@ -487,6 +498,7 @@ var (
DagStorePRBytesRequestedView = &view.View{
Measure: DagStorePRBytesRequested,
Aggregation: view.Sum(),
TagKeys: []tag.Key{PRReadType},
}
DagStorePRBytesDiscardedView = &view.View{
Measure: DagStorePRBytesDiscarded,
Expand All @@ -513,6 +525,29 @@ var (
Aggregation: view.Sum(),
}

DagStorePRAtHitBytesView = &view.View{
Measure: DagStorePRAtHitBytes,
Aggregation: view.Sum(),
}
DagStorePRAtHitCountView = &view.View{
Measure: DagStorePRAtHitCount,
Aggregation: view.Count(),
}
DagStorePRAtCacheFillCountView = &view.View{
Measure: DagStorePRAtCacheFillCount,
Aggregation: view.Count(),
}
DagStorePRAtReadBytesView = &view.View{
Measure: DagStorePRAtReadBytes,
Aggregation: view.Sum(),
TagKeys: []tag.Key{PRReadSize},
}
DagStorePRAtReadCountView = &view.View{
Measure: DagStorePRAtReadCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{PRReadSize},
}

// splitstore
SplitstoreMissView = &view.View{
Measure: SplitstoreMiss,
Expand Down Expand Up @@ -779,6 +814,11 @@ var MinerNodeViews = append([]*view.View{
DagStorePRSeekForwardCountView,
DagStorePRSeekBackBytesView,
DagStorePRSeekForwardBytesView,
DagStorePRAtHitBytesView,
DagStorePRAtHitCountView,
DagStorePRAtCacheFillCountView,
DagStorePRAtReadBytesView,
DagStorePRAtReadCountView,
}, DefaultViews...)

var GatewayNodeViews = append([]*view.View{
Expand Down
3 changes: 2 additions & 1 deletion storage/paths/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package paths
import (
"bytes"
"encoding/json"
"io"
"net/http"
"os"
"strconv"
Expand Down Expand Up @@ -35,7 +36,7 @@ func (d *DefaultPartialFileHandler) HasAllocated(pf *partialfile.PartialFile, of
return pf.HasAllocated(offset, size)
}

func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) {
return pf.Reader(offset, size)
}

Expand Down
4 changes: 2 additions & 2 deletions storage/paths/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ func TestRemoteGetAllocated(t *testing.T) {
pfhandler := mocks.NewMockPartialFileHandler(mockCtrl)

handler := &paths.FetchHandler{
lstore,
pfhandler,
Local: lstore,
PfHandler: pfhandler,
}

// run http server
Expand Down
4 changes: 2 additions & 2 deletions storage/paths/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package paths

import (
"context"
"os"
"io"

"github.com/filecoin-project/go-state-types/abi"

Expand All @@ -24,7 +24,7 @@ type PartialFileHandler interface {
HasAllocated(pf *partialfile.PartialFile, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)

// Reader returns a file from which we can read the unsealed piece in the partial file.
Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error)
Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error)

// Close closes the partial file
Close(pf *partialfile.PartialFile) error
Expand Down
6 changes: 3 additions & 3 deletions storage/paths/mocks/pf.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 66 additions & 31 deletions storage/paths/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,26 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/partialfile"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

var FetchTempSubdir = "fetching"

var CopyBuf = 1 << 20

// LocalReaderTimeout is the timeout for keeping local reader files open without
// any read activity.
var LocalReaderTimeout = 5 * time.Second

type Remote struct {
local Store
index SectorIndex
Expand Down Expand Up @@ -563,7 +569,7 @@ func (r *Remote) CheckIsUnsealed(ctx context.Context, s storiface.SectorRef, off
// 1. no worker(local worker included) has an unsealed file for the given sector OR
// 2. no worker(local worker included) has the unsealed piece in their unsealed sector file.
// Will return a nil reader and a nil error in such a case.
func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) {
func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) {
ft := storiface.FTUnsealed

// check if we have the unsealed sector file locally
Expand Down Expand Up @@ -602,20 +608,67 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
if has {
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)

return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// don't reuse between readers unless closed
f := pf
pf = nil
// refs keep track of the currently opened pf
// if they drop to 0 for longer than LocalReaderTimeout, pf will be closed
var refsLk sync.Mutex
refs := 0

cleanupIdle := func() {
lastRefs := 1

for range time.After(LocalReaderTimeout) {
refsLk.Lock()
if refs == 0 && lastRefs == 0 && pf != nil { // pf can't really be nil here, but better be safe
log.Infow("closing idle partial file", "path", path)
err := pf.Close()
if err != nil {
log.Errorw("closing idle partial file", "path", path, "error", err)
}

pf = nil
refsLk.Unlock()
return
}
lastRefs = refs
refsLk.Unlock()
}
}

getPF := func() (*partialfile.PartialFile, func() error, error) {
refsLk.Lock()
defer refsLk.Unlock()

if pf == nil {
// got closed in the meantime, reopen

if f == nil {
f, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
var err error
pf, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return nil, xerrors.Errorf("opening partial file: %w", err)
return nil, nil, xerrors.Errorf("reopening partial file: %w", err)
}
log.Debugf("local partial file (re)opened %s (+%d,%d)", path, offset, size)
log.Debugf("local partial file reopened %s (+%d,%d)", path, offset, size)

go cleanupIdle()
}

r, err := r.pfHandler.Reader(f, storiface.PaddedByteIndex(offset)+startOffsetAligned, size-abi.PaddedPieceSize(startOffsetAligned))
refs++

return pf, func() error {
refsLk.Lock()
defer refsLk.Unlock()

refs--
return nil
}, nil
}

return func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
pf, done, err := getPF()
if err != nil {
return nil, xerrors.Errorf("getting partialfile handle: %w", err)
}

r, err := r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset)+startOffsetAligned, abi.PaddedPieceSize(endOffsetAligned-startOffsetAligned))
if err != nil {
return nil, err
}
Expand All @@ -625,25 +678,7 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
io.Closer
}{
Reader: r,
Closer: funcCloser(func() error {
// if we already have a reader cached, close this one
if pf != nil {
if f == nil {
return nil
}
if pf == f {
pf = nil
}

tmp := f
f = nil
return tmp.Close()
}

// otherwise stash it away for reuse
pf = f
return nil
}),
Closer: funcCloser(done),
}, nil
}, nil

Expand Down Expand Up @@ -689,10 +724,10 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
continue
}

return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
return func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), size)
rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), offset+abi.PaddedPieceSize(endOffsetAligned))
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions storage/paths/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func TestReader(t *testing.T) {
require.Nil(t, rdg)
require.Contains(t, err.Error(), tc.errStr)
} else {
rd, err = rdg(0)
rd, err = rdg(0, storiface.PaddedByteIndex(size))
require.Error(t, err)
require.Nil(t, rd)
require.Contains(t, err.Error(), tc.errStr)
Expand All @@ -490,7 +490,7 @@ func TestReader(t *testing.T) {
require.Nil(t, rd)
} else {
require.NotNil(t, rdg)
rd, err := rdg(0)
rd, err := rdg(0, storiface.PaddedByteIndex(size))
require.NoError(t, err)

defer func() {
Expand Down
Loading

0 comments on commit 96fa081

Please sign in to comment.