Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: miner cli: sealing data-cid command #8715

Merged
merged 6 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions cmd/lotus-miner/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,27 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"os"
"sort"
"strings"
"text/tabwriter"
"time"

"github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/lib/httpreader"

"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
Expand All @@ -31,6 +38,7 @@ var sealingCmd = &cli.Command{
workersCmd(true),
sealingSchedDiagCmd,
sealingAbortCmd,
sealingDataCidCmd,
},
}

Expand Down Expand Up @@ -349,3 +357,94 @@ var sealingAbortCmd = &cli.Command{
return nodeApi.SealingAbort(ctx, job.ID)
},
}

var sealingDataCidCmd = &cli.Command{
Name: "data-cid",
Usage: "Compute data CID using workers",
ArgsUsage: "[file/url] <padded piece size>",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "file-size",
Usage: "real file size",
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() < 1 || cctx.Args().Len() > 2 {
return xerrors.Errorf("expected 1 or 2 arguments")
}

nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()

ctx := lcli.ReqContext(cctx)

var r io.Reader
sz := cctx.Uint64("file-size")

if strings.HasPrefix(cctx.Args().First(), "http://") || strings.HasPrefix(cctx.Args().First(), "https://") {
r = &httpreader.HttpReader{
URL: cctx.Args().First(),
}

if !cctx.IsSet("file-size") {
resp, err := http.Head(cctx.Args().First())
if err != nil {
return xerrors.Errorf("http head: %w", err)
}

if resp.ContentLength < 0 {
return xerrors.Errorf("head response didn't contain content length; specify --file-size")
}
sz = uint64(resp.ContentLength)
}
} else {
p, err := homedir.Expand(cctx.Args().First())
if err != nil {
return xerrors.Errorf("expanding path: %w", err)
}

f, err := os.OpenFile(p, os.O_RDONLY, 0)
if err != nil {
return xerrors.Errorf("opening source file: %w", err)
}

if !cctx.IsSet("file-size") {
st, err := f.Stat()
if err != nil {
return xerrors.Errorf("stat: %w", err)
}
sz = uint64(st.Size())
}

r = f
}

var psize abi.PaddedPieceSize
if cctx.Args().Len() == 2 {
rps, err := humanize.ParseBytes(cctx.Args().Get(1))
if err != nil {
return xerrors.Errorf("parsing piece size: %w", err)
}
psize = abi.PaddedPieceSize(rps)
if err := psize.Validate(); err != nil {
return xerrors.Errorf("checking piece size: %w", err)
}
if sz > uint64(psize.Unpadded()) {
return xerrors.Errorf("file larger than the piece")
}
} else {
psize = padreader.PaddedSize(sz).Padded()
}

pc, err := nodeApi.ComputeDataCid(ctx, psize.Unpadded(), r)
if err != nil {
return xerrors.Errorf("computing data CID: %w", err)
}

fmt.Println(pc.PieceCID, " ", pc.Size)
return nil
},
}
15 changes: 15 additions & 0 deletions documentation/en/cli-lotus-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -2331,6 +2331,7 @@ COMMANDS:
workers list workers
sched-diag Dump internal scheduler state
abort Abort a running job
data-cid Compute data CID using workers
help, h Shows a list of commands or help for one command

OPTIONS:
Expand Down Expand Up @@ -2393,3 +2394,17 @@ OPTIONS:
--help, -h show help (default: false)

```

### lotus-miner sealing data-cid
```
NAME:
lotus-miner sealing data-cid - Compute data CID using workers

USAGE:
lotus-miner sealing data-cid [command options] [file/url] <padded piece size>

OPTIONS:
--file-size value real file size (default: 0)
--help, -h show help (default: false)

```
7 changes: 7 additions & 0 deletions extern/sector-storage/ffiwrapper/sealer_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
nr "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
"github.com/filecoin-project/lotus/lib/nullreader"
)

var _ Storage = &Sealer{}
Expand All @@ -53,6 +54,11 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error
}

func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
pieceData = io.LimitReader(io.MultiReader(
pieceData,
nullreader.Reader{},
), int64(pieceSize))

// TODO: allow tuning those:
chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU()
Expand All @@ -73,6 +79,7 @@ func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize,
for {
var read int
for rbuf := buf; len(rbuf) > 0; {

n, err := pieceData.Read(rbuf)
if err != nil && err != io.EOF {
return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err)
Expand Down
20 changes: 13 additions & 7 deletions itests/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,23 @@ func TestWorkerDataCid(t *testing.T) {
e, err := worker.Enabled(ctx)
require.NoError(t, err)
require.True(t, e)
/*
pi, err := miner.ComputeDataCid(ctx, 1016, strings.NewReader(strings.Repeat("a", 1016)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)
require.Equal(t, "baga6ea4seaqlhznlutptgfwhffupyer6txswamerq5fc2jlwf2lys2mm5jtiaeq", pi.PieceCID.String())
*/

pi, err := miner.ComputeDataCid(ctx, 1016, strings.NewReader(strings.Repeat("a", 1016)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)
require.Equal(t, "baga6ea4seaqlhznlutptgfwhffupyer6txswamerq5fc2jlwf2lys2mm5jtiaeq", pi.PieceCID.String())

bigPiece := abi.PaddedPieceSize(16 << 20).Unpadded()
pi, err := miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(bigPiece))))
pi, err = miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(bigPiece))))
require.NoError(t, err)
require.Equal(t, bigPiece.Padded(), pi.Size)
require.Equal(t, "baga6ea4seaqmhoxl2ybw5m2wyd3pt3h4zmp7j52yumzu2rar26twns3uocq7yfa", pi.PieceCID.String())

nonFullPiece := abi.PaddedPieceSize(10 << 20).Unpadded()
pi, err = miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(nonFullPiece))))
require.NoError(t, err)
require.Equal(t, bigPiece.Padded(), pi.Size)
require.Equal(t, "baga6ea4seaqbxib4pdxs5cqdn3fmtj4rcxk6rx6ztiqmrx7fcpo3ymuxbp2rodi", pi.PieceCID.String())
}

func TestWinningPostWorker(t *testing.T) {
Expand Down
47 changes: 47 additions & 0 deletions lib/httpreader/httpreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package httpreader

import (
"io"
"net/http"

"golang.org/x/xerrors"
)

// HttpReader is a reader which will read a http resource with a simple get request.
// Before first Read it will be passed over JsonRPC as a URL.
type HttpReader struct {
URL string

reader io.ReadCloser
}

func (h *HttpReader) Close() error {
h.URL = ""
if h.reader != nil {
return h.reader.Close()
}
return nil
}

func (h *HttpReader) Read(p []byte) (n int, err error) {
if h.reader == nil {
res, err := http.Get(h.URL)
if err != nil {
return 0, err
}
if res.StatusCode != http.StatusOK {
return 0, xerrors.Errorf("unexpected http status %d", res.StatusCode)
}

// mark the reader as reading
h.URL = ""
h.reader = res.Body
}
if h.reader == nil {
return 0, xerrors.Errorf("http reader closed")
}

return h.reader.Read(p)
}

var _ io.ReadCloser = &HttpReader{}
10 changes: 9 additions & 1 deletion lib/rpcenc/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
"github.com/filecoin-project/lotus/lib/httpreader"
)

var log = logging.Logger("rpcenc")
Expand All @@ -34,6 +35,7 @@ type StreamType string
const (
Null StreamType = "null"
PushStream StreamType = "push"
HTTP StreamType = "http"
// TODO: Data transfer handoff to workers?
)

Expand Down Expand Up @@ -105,6 +107,9 @@ func ReaderParamEncoder(addr string) jsonrpc.Option {
if r, ok := r.(*nullreader.NullReader); ok {
return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil
}
if r, ok := r.(*httpreader.HttpReader); ok && r.URL != "" {
return reflect.ValueOf(ReaderStream{Type: HTTP, Info: r.URL}), nil
}

reqID := uuid.New()
u, err := url.Parse(addr)
Expand Down Expand Up @@ -413,13 +418,16 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err)
}

if rs.Type == Null {
switch rs.Type {
case Null:
n, err := strconv.ParseInt(rs.Info, 10, 64)
if err != nil {
return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err)
}

return reflect.ValueOf(nullreader.NewNullReader(abi.UnpaddedPieceSize(n))), nil
case HTTP:
return reflect.ValueOf(&httpreader.HttpReader{URL: rs.Info}), nil
}

u, err := uuid.Parse(rs.Info)
Expand Down