Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: piecereader: Allow parallel access #10895

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/lotus-shed/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ fr32 padding is removed from the output.`,
return xerrors.Errorf("getting reader: %w", err)
}

rd, err := readStarter(0)
rd, err := readStarter(0, storiface.PaddedByteIndex(length))
if err != nil {
return xerrors.Errorf("starting reader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion itests/deals_partial_retrieval_dm-level_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestDMLevelPartialRetrieval(t *testing.T) {
ctx := context.Background()

kit.QuietMiningLogs()
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MockProofs())
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC())
dh := kit.NewDealHarness(t, client, miner, miner)
ens.InterconnectAll().BeginMining(50 * time.Millisecond)

Expand Down
44 changes: 44 additions & 0 deletions lib/readerutil/readerutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package readerutil

import (
"io"
"os"
)

// NewReadSeekerFromReaderAt returns a new io.ReadSeeker from a io.ReaderAt.
// The returned io.ReadSeeker will read from the io.ReaderAt starting at the
// given base offset.
func NewReadSeekerFromReaderAt(readerAt io.ReaderAt, base int64) io.ReadSeeker {
return &readSeekerFromReaderAt{
readerAt: readerAt,
base: base,
pos: 0,
}
}

type readSeekerFromReaderAt struct {
readerAt io.ReaderAt
base int64
pos int64
}

func (rs *readSeekerFromReaderAt) Read(p []byte) (n int, err error) {
n, err = rs.readerAt.ReadAt(p, rs.pos+rs.base)
rs.pos += int64(n)
return n, err
}

func (rs *readSeekerFromReaderAt) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
rs.pos = offset
case io.SeekCurrent:
rs.pos += offset
case io.SeekEnd:
return 0, io.ErrUnexpectedEOF
default:
return 0, os.ErrInvalid
}

return rs.pos, nil
}
3 changes: 2 additions & 1 deletion storage/paths/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package paths
import (
"bytes"
"encoding/json"
"io"
"net/http"
"os"
"strconv"
Expand Down Expand Up @@ -35,7 +36,7 @@ func (d *DefaultPartialFileHandler) HasAllocated(pf *partialfile.PartialFile, of
return pf.HasAllocated(offset, size)
}

func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) {
return pf.Reader(offset, size)
}

Expand Down
4 changes: 2 additions & 2 deletions storage/paths/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ func TestRemoteGetAllocated(t *testing.T) {
pfhandler := mocks.NewMockPartialFileHandler(mockCtrl)

handler := &paths.FetchHandler{
lstore,
pfhandler,
Local: lstore,
PfHandler: pfhandler,
}

// run http server
Expand Down
4 changes: 2 additions & 2 deletions storage/paths/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package paths

import (
"context"
"os"
"io"

"github.com/filecoin-project/go-state-types/abi"

Expand All @@ -24,7 +24,7 @@ type PartialFileHandler interface {
HasAllocated(pf *partialfile.PartialFile, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)

// Reader returns a file from which we can read the unsealed piece in the partial file.
Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error)
Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error)

// Close closes the partial file
Close(pf *partialfile.PartialFile) error
Expand Down
6 changes: 3 additions & 3 deletions storage/paths/mocks/pf.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 6 additions & 33 deletions storage/paths/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func (r *Remote) CheckIsUnsealed(ctx context.Context, s storiface.SectorRef, off
// 1. no worker(local worker included) has an unsealed file for the given sector OR
// 2. no worker(local worker included) has the unsealed piece in their unsealed sector file.
// Will return a nil reader and a nil error in such a case.
func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) {
func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) {
ft := storiface.FTUnsealed

// check if we have the unsealed sector file locally
Expand Down Expand Up @@ -602,20 +602,8 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
if has {
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)

return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// don't reuse between readers unless closed
f := pf
pf = nil

if f == nil {
f, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return nil, xerrors.Errorf("opening partial file: %w", err)
}
log.Debugf("local partial file (re)opened %s (+%d,%d)", path, offset, size)
}

r, err := r.pfHandler.Reader(f, storiface.PaddedByteIndex(offset)+startOffsetAligned, size-abi.PaddedPieceSize(startOffsetAligned))
return func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
r, err := r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset)+startOffsetAligned, abi.PaddedPieceSize(endOffsetAligned-startOffsetAligned))
if err != nil {
return nil, err
}
Expand All @@ -626,22 +614,7 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
}{
Reader: r,
Closer: funcCloser(func() error {
// if we already have a reader cached, close this one
if pf != nil {
if f == nil {
return nil
}
if pf == f {
pf = nil
}

tmp := f
f = nil
return tmp.Close()
}

// otherwise stash it away for reuse
pf = f
// todo keep some refcount, close pf (or push to some lru) when it hits 0
return nil
}),
}, nil
Expand Down Expand Up @@ -689,10 +662,10 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
continue
}

return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
return func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), size)
rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), offset+abi.PaddedPieceSize(endOffsetAligned))
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions storage/paths/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func TestReader(t *testing.T) {
require.Nil(t, rdg)
require.Contains(t, err.Error(), tc.errStr)
} else {
rd, err = rdg(0)
rd, err = rdg(0, storiface.PaddedByteIndex(size))
require.Error(t, err)
require.Nil(t, rd)
require.Contains(t, err.Error(), tc.errStr)
Expand All @@ -490,7 +490,7 @@ func TestReader(t *testing.T) {
require.Nil(t, rd)
} else {
require.NotNil(t, rdg)
rd, err := rdg(0)
rd, err := rdg(0, storiface.PaddedByteIndex(size))
require.NoError(t, err)

defer func() {
Expand Down
8 changes: 6 additions & 2 deletions storage/sealer/partialfile/partialfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/lib/readerutil"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
Expand Down Expand Up @@ -249,7 +250,10 @@ func (pf *PartialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPie
return nil
}

func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
// Reader forks off a new reader from the underlying file, and returns a reader
// starting at the given offset and reading the given size. Safe for concurrent
// use.
func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
Expand All @@ -275,7 +279,7 @@ func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedP
}
}

return pf.file, nil
return io.LimitReader(readerutil.NewReadSeekerFromReaderAt(pf.file, int64(offset)), int64(size)), nil
}

func (pf *PartialFile) Allocated() (rlepluslazy.RunIterator, error) {
Expand Down
24 changes: 15 additions & 9 deletions storage/sealer/piece_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

"github.com/ipfs/go-cid"
pool "github.com/libp2p/go-buffer-pool"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore/mount"
Expand Down Expand Up @@ -71,7 +72,7 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storiface.SectorR
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storiface.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (mount.Reader, error) {
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storiface.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize) (mount.Reader, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
Expand All @@ -82,7 +83,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
// Reader returns a reader getter for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded())
rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), pieceSize.Padded())
if err != nil {
cancel()
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
Expand All @@ -93,27 +94,31 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
return nil, nil
}

buf := make([]byte, fr32.BufSize(size.Padded()))

pr, err := (&pieceReader{
ctx: ctx,
getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) {
getReader: func(ctx context.Context, startOffset, readSize uint64) (io.ReadCloser, error) {
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
startOffsetDiff := int(startOffset - uint64(startOffsetAligned))

endOffset := startOffset + readSize
endOffsetAligned := storiface.UnpaddedByteIndex((endOffset + 126) / 127 * 127) // ceil to multiple of 127

r, err := rg(startOffsetAligned.Padded())
r, err := rg(startOffsetAligned.Padded(), endOffsetAligned.Padded())
if err != nil {
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
}

upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf)
buf := pool.Get(fr32.BufSize(pieceSize.Padded()))

upr, err := fr32.NewUnpadReaderBuf(r, pieceSize.Padded(), buf)
if err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("creating unpadded reader: %w", err)
}

bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
if _, err := bir.Discard(startOffsetDiff); err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
Expand All @@ -125,11 +130,12 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
}{
Reader: bir,
Closer: funcCloser(func() error {
pool.Put(buf)
return r.Close()
}),
}, nil
},
len: size,
len: pieceSize,
onClose: cancel,
pieceCid: pc,
}).init()
Expand Down
Loading