diff --git a/cmd/lotus-miner/sealing.go b/cmd/lotus-miner/sealing.go index 4f048ad1750..df467e66a94 100644 --- a/cmd/lotus-miner/sealing.go +++ b/cmd/lotus-miner/sealing.go @@ -4,20 +4,27 @@ import ( "encoding/hex" "encoding/json" "fmt" + "io" "math" + "net/http" "os" "sort" "strings" "text/tabwriter" "time" + "github.com/dustin/go-humanize" "github.com/fatih/color" "github.com/google/uuid" + "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" "golang.org/x/xerrors" + "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" + "github.com/filecoin-project/lotus/lib/httpreader" "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" @@ -31,6 +38,7 @@ var sealingCmd = &cli.Command{ workersCmd(true), sealingSchedDiagCmd, sealingAbortCmd, + sealingDataCidCmd, }, } @@ -349,3 +357,94 @@ var sealingAbortCmd = &cli.Command{ return nodeApi.SealingAbort(ctx, job.ID) }, } + +var sealingDataCidCmd = &cli.Command{ + Name: "data-cid", + Usage: "Compute data CID using workers", + ArgsUsage: "[file/url] ", + Flags: []cli.Flag{ + &cli.Uint64Flag{ + Name: "file-size", + Usage: "real file size", + }, + }, + Action: func(cctx *cli.Context) error { + if cctx.Args().Len() < 1 || cctx.Args().Len() > 2 { + return xerrors.Errorf("expected 1 or 2 arguments") + } + + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + var r io.Reader + sz := cctx.Uint64("file-size") + + if strings.HasPrefix(cctx.Args().First(), "http://") || strings.HasPrefix(cctx.Args().First(), "https://") { + r = &httpreader.HttpReader{ + URL: cctx.Args().First(), + } + + if !cctx.IsSet("file-size") { + resp, err := http.Head(cctx.Args().First()) + if err != nil { + return xerrors.Errorf("http head: %w", err) + } + + if resp.ContentLength < 0 { + return xerrors.Errorf("head response didn't contain content length; specify --file-size") + } + sz = uint64(resp.ContentLength) + } + } else { + p, err := homedir.Expand(cctx.Args().First()) + if err != nil { + return xerrors.Errorf("expanding path: %w", err) + } + + f, err := os.OpenFile(p, os.O_RDONLY, 0) + if err != nil { + return xerrors.Errorf("opening source file: %w", err) + } + + if !cctx.IsSet("file-size") { + st, err := f.Stat() + if err != nil { + return xerrors.Errorf("stat: %w", err) + } + sz = uint64(st.Size()) + } + + r = f + } + + var psize abi.PaddedPieceSize + if cctx.Args().Len() == 2 { + rps, err := humanize.ParseBytes(cctx.Args().Get(1)) + if err != nil { + return xerrors.Errorf("parsing piece size: %w", err) + } + psize = abi.PaddedPieceSize(rps) + if err := psize.Validate(); err != nil { + return xerrors.Errorf("checking piece size: %w", err) + } + if sz > uint64(psize.Unpadded()) { + return xerrors.Errorf("file larger than the piece") + } + } else { + psize = padreader.PaddedSize(sz).Padded() + } + + pc, err := nodeApi.ComputeDataCid(ctx, psize.Unpadded(), r) + if err != nil { + return xerrors.Errorf("computing data CID: %w", err) + } + + fmt.Println(pc.PieceCID, " ", pc.Size) + return nil + }, +} diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index 73c996348e6..4122026f86d 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -2331,6 +2331,7 @@ COMMANDS: workers list workers sched-diag Dump internal scheduler state abort Abort a running job + data-cid Compute data CID using workers help, h Shows a list of commands or help for one command OPTIONS: @@ -2393,3 +2394,17 @@ OPTIONS: --help, -h show help (default: false) ``` + +### lotus-miner sealing data-cid +``` +NAME: + lotus-miner sealing data-cid - Compute data CID using workers + +USAGE: + lotus-miner sealing data-cid [command options] [file/url] + +OPTIONS: + --file-size value real file size (default: 0) + --help, -h show help (default: false) + +``` diff --git a/extern/sector-storage/ffiwrapper/sealer_cgo.go b/extern/sector-storage/ffiwrapper/sealer_cgo.go index 79de4fdb964..d63e83c6e8f 100644 --- a/extern/sector-storage/ffiwrapper/sealer_cgo.go +++ b/extern/sector-storage/ffiwrapper/sealer_cgo.go @@ -32,6 +32,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/partialfile" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" nr "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader" + "github.com/filecoin-project/lotus/lib/nullreader" ) var _ Storage = &Sealer{} @@ -53,6 +54,11 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error } func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + pieceData = io.LimitReader(io.MultiReader( + pieceData, + nullreader.Reader{}, + ), int64(pieceSize)) + // TODO: allow tuning those: chunk := abi.PaddedPieceSize(4 << 20) parallel := runtime.NumCPU() @@ -73,6 +79,7 @@ func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, for { var read int for rbuf := buf; len(rbuf) > 0; { + n, err := pieceData.Read(rbuf) if err != nil && err != io.EOF { return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err) diff --git a/itests/worker_test.go b/itests/worker_test.go index c1fba26007f..0d20c2f1973 100644 --- a/itests/worker_test.go +++ b/itests/worker_test.go @@ -49,17 +49,23 @@ func TestWorkerDataCid(t *testing.T) { e, err := worker.Enabled(ctx) require.NoError(t, err) require.True(t, e) - /* - pi, err := miner.ComputeDataCid(ctx, 1016, strings.NewReader(strings.Repeat("a", 1016))) - require.NoError(t, err) - require.Equal(t, abi.PaddedPieceSize(1024), pi.Size) - require.Equal(t, "baga6ea4seaqlhznlutptgfwhffupyer6txswamerq5fc2jlwf2lys2mm5jtiaeq", pi.PieceCID.String()) - */ + + pi, err := miner.ComputeDataCid(ctx, 1016, strings.NewReader(strings.Repeat("a", 1016))) + require.NoError(t, err) + require.Equal(t, abi.PaddedPieceSize(1024), pi.Size) + require.Equal(t, "baga6ea4seaqlhznlutptgfwhffupyer6txswamerq5fc2jlwf2lys2mm5jtiaeq", pi.PieceCID.String()) + bigPiece := abi.PaddedPieceSize(16 << 20).Unpadded() - pi, err := miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(bigPiece)))) + pi, err = miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(bigPiece)))) require.NoError(t, err) require.Equal(t, bigPiece.Padded(), pi.Size) require.Equal(t, "baga6ea4seaqmhoxl2ybw5m2wyd3pt3h4zmp7j52yumzu2rar26twns3uocq7yfa", pi.PieceCID.String()) + + nonFullPiece := abi.PaddedPieceSize(10 << 20).Unpadded() + pi, err = miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(nonFullPiece)))) + require.NoError(t, err) + require.Equal(t, bigPiece.Padded(), pi.Size) + require.Equal(t, "baga6ea4seaqbxib4pdxs5cqdn3fmtj4rcxk6rx6ztiqmrx7fcpo3ymuxbp2rodi", pi.PieceCID.String()) } func TestWinningPostWorker(t *testing.T) { diff --git a/lib/httpreader/httpreader.go b/lib/httpreader/httpreader.go new file mode 100644 index 00000000000..62338e76ec8 --- /dev/null +++ b/lib/httpreader/httpreader.go @@ -0,0 +1,47 @@ +package httpreader + +import ( + "io" + "net/http" + + "golang.org/x/xerrors" +) + +// HttpReader is a reader which will read a http resource with a simple get request. +// Before first Read it will be passed over JsonRPC as a URL. +type HttpReader struct { + URL string + + reader io.ReadCloser +} + +func (h *HttpReader) Close() error { + h.URL = "" + if h.reader != nil { + return h.reader.Close() + } + return nil +} + +func (h *HttpReader) Read(p []byte) (n int, err error) { + if h.reader == nil { + res, err := http.Get(h.URL) + if err != nil { + return 0, err + } + if res.StatusCode != http.StatusOK { + return 0, xerrors.Errorf("unexpected http status %d", res.StatusCode) + } + + // mark the reader as reading + h.URL = "" + h.reader = res.Body + } + if h.reader == nil { + return 0, xerrors.Errorf("http reader closed") + } + + return h.reader.Read(p) +} + +var _ io.ReadCloser = &HttpReader{} diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go index 4e3ebb8c203..a4f16982e05 100644 --- a/lib/rpcenc/reader.go +++ b/lib/rpcenc/reader.go @@ -23,6 +23,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader" + "github.com/filecoin-project/lotus/lib/httpreader" ) var log = logging.Logger("rpcenc") @@ -34,6 +35,7 @@ type StreamType string const ( Null StreamType = "null" PushStream StreamType = "push" + HTTP StreamType = "http" // TODO: Data transfer handoff to workers? ) @@ -105,6 +107,9 @@ func ReaderParamEncoder(addr string) jsonrpc.Option { if r, ok := r.(*nullreader.NullReader); ok { return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil } + if r, ok := r.(*httpreader.HttpReader); ok && r.URL != "" { + return reflect.ValueOf(ReaderStream{Type: HTTP, Info: r.URL}), nil + } reqID := uuid.New() u, err := url.Parse(addr) @@ -413,13 +418,16 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err) } - if rs.Type == Null { + switch rs.Type { + case Null: n, err := strconv.ParseInt(rs.Info, 10, 64) if err != nil { return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err) } return reflect.ValueOf(nullreader.NewNullReader(abi.UnpaddedPieceSize(n))), nil + case HTTP: + return reflect.ValueOf(&httpreader.HttpReader{URL: rs.Info}), nil } u, err := uuid.Parse(rs.Info)