diff --git a/api/api_full.go b/api/api_full.go index 57ca5bcfd42..f2e7ecf6bdf 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -290,6 +290,8 @@ type FullNode interface { ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error) // ClientQueryAsk returns a signed StorageAsk from the specified miner. ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) + // ClientCalcCommP calculates the CommP and data size of the specified CID + ClientDealPieceCID(ctx context.Context, root cid.Cid) (DataCIDSize, error) // ClientCalcCommP calculates the CommP for a specified file ClientCalcCommP(ctx context.Context, inpath string) (*CommPRet, error) // ClientGenCar generates a CAR file for the specified file. @@ -876,6 +878,12 @@ type DataSize struct { PieceSize abi.PaddedPieceSize } +type DataCIDSize struct { + PayloadSize int64 + PieceSize abi.PaddedPieceSize + PieceCID cid.Cid +} + type CommPRet struct { Root cid.Cid Size abi.UnpaddedPieceSize diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index e302aa8dc16..36c7124d1ac 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -161,6 +161,7 @@ type FullNodeStruct struct { ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"` ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) `perm:"read"` + ClientDealPieceCID func(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) `perm:"read"` ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"` ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` @@ -535,6 +536,11 @@ func (c *FullNodeStruct) ClientRetrieveWithEvents(ctx context.Context, order api func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) { return c.Internal.ClientQueryAsk(ctx, p, miner) } + +func (c *FullNodeStruct) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) { + return c.Internal.ClientDealPieceCID(ctx, root) +} + func (c *FullNodeStruct) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) { return c.Internal.ClientCalcCommP(ctx, inpath) } diff --git a/cli/client.go b/cli/client.go index 01de8801c04..4df17dce3dd 100644 --- a/cli/client.go +++ b/cli/client.go @@ -489,7 +489,7 @@ func interactiveDeal(cctx *cli.Context) error { var dur time.Duration var epochs abi.ChainEpoch var verified bool - var ds lapi.DataSize + var ds lapi.DataCIDSize // find var candidateAsks []*storagemarket.StorageAsk @@ -553,7 +553,7 @@ uiLoop: } color.Blue(".. calculating data size\n") - ds, err = api.ClientDealSize(ctx, data) + ds, err = api.ClientDealPieceCID(ctx, data) if err != nil { return err } @@ -843,6 +843,9 @@ uiLoop: Data: &storagemarket.DataRef{ TransferType: storagemarket.TTGraphsync, Root: data, + + PieceCid: &ds.PieceCID, + PieceSize: ds.PieceSize.Unpadded(), }, Wallet: a, Miner: maddr, diff --git a/node/impl/client/client.go b/node/impl/client/client.go index f04d310ce96..63158e58134 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -1,9 +1,12 @@ package client import ( + "bufio" + "bytes" "context" "fmt" "io" + "math/bits" "os" "github.com/filecoin-project/go-state-types/dline" @@ -31,6 +34,7 @@ import ( mh "github.com/multiformats/go-multihash" "go.uber.org/fx" + ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/discovery" @@ -44,6 +48,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/sector-storage/zerocomm" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/api" @@ -709,6 +714,120 @@ func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, e }, nil } +const commPBufPad = abi.PaddedPieceSize(8 << 20) +const commPBuf = abi.UnpaddedPieceSize(commPBufPad - (commPBufPad / 128)) // can't use .Unpadded() for const + +type commPWriter struct { + len int64 + buf [commPBuf]byte + leaves []cid.Cid +} + +func (w *commPWriter) Write(p []byte) (int, error) { + n := len(p) + for len(p) > 0 { + buffered := int(w.len % int64(len(w.buf))) + toBuffer := len(w.buf) - buffered + if toBuffer > len(p) { + toBuffer = len(p) + } + + copied := copy(w.buf[buffered:], p[:toBuffer]) + p = p[copied:] + w.len += int64(copied) + + if copied > 0 && w.len%int64(len(w.buf)) == 0 { + leaf, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, bytes.NewReader(w.buf[:]), commPBuf) + if err != nil { + return 0, err + } + w.leaves = append(w.leaves, leaf) + } + } + return n, nil +} + +func (w *commPWriter) Sum() (api.DataCIDSize, error) { + // process last non-zero leaf if exists + lastLen := w.len % int64(len(w.buf)) + rawLen := w.len + + // process remaining bit of data + if lastLen != 0 { + if len(w.leaves) != 0 { + copy(w.buf[lastLen:], make([]byte, int(int64(commPBuf)-lastLen))) + lastLen = int64(commPBuf) + } + + r, sz := padreader.New(bytes.NewReader(w.buf[:lastLen]), uint64(lastLen)) + p, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, r, sz) + if err != nil { + return api.DataCIDSize{}, err + } + + if sz < commPBuf { // special case for pieces smaller than 16MiB + return api.DataCIDSize{ + PayloadSize: w.len, + PieceSize: sz.Padded(), + PieceCID: p, + }, nil + } + + w.leaves = append(w.leaves, p) + } + + // pad with zero pieces to power-of-two size + fillerLeaves := (1 << (bits.Len(uint(len(w.leaves) - 1)))) - len(w.leaves) + for i := 0; i < fillerLeaves; i++ { + w.leaves = append(w.leaves, zerocomm.ZeroPieceCommitment(commPBuf)) + } + + if len(w.leaves) == 1 { + return api.DataCIDSize{ + PayloadSize: rawLen, + PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad, + PieceCID: w.leaves[0], + }, nil + } + + pieces := make([]abi.PieceInfo, len(w.leaves)) + for i, leaf := range w.leaves { + pieces[i] = abi.PieceInfo{ + Size: commPBufPad, + PieceCID: leaf, + } + } + + p, err := ffi.GenerateUnsealedCID(abi.RegisteredSealProof_StackedDrg32GiBV1, pieces) + if err != nil { + return api.DataCIDSize{}, xerrors.Errorf("generating unsealed CID: %w", err) + } + + return api.DataCIDSize{ + PayloadSize: rawLen, + PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad, + PieceCID: p, + }, nil +} + +func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) { + dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore))) + + w := &commPWriter{} + bw := bufio.NewWriterSize(w, int(commPBuf)) + + err := car.WriteCar(ctx, dag, []cid.Cid{root}, w) + if err != nil { + return api.DataCIDSize{}, err + } + + if err := bw.Flush(); err != nil { + return api.DataCIDSize{}, err + } + + return w.Sum() +} + func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error { id, st, err := a.imgr().NewStore() if err != nil { diff --git a/node/impl/client/client_test.go b/node/impl/client/client_test.go new file mode 100644 index 00000000000..15dc133f078 --- /dev/null +++ b/node/impl/client/client_test.go @@ -0,0 +1,87 @@ +package client + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/sector-storage/zerocomm" +) + +func TestClientDealPieceCIDZero(t *testing.T) { + for i, s := range []struct { + writes []int + expect abi.PaddedPieceSize + }{ + {writes: []int{200}, expect: 256}, + {writes: []int{200, 200}, expect: 512}, + + {writes: []int{int(commPBuf)}, expect: commPBufPad}, + {writes: []int{int(commPBuf) * 2}, expect: 2 * commPBufPad}, + {writes: []int{int(commPBuf), int(commPBuf), int(commPBuf)}, expect: 4 * commPBufPad}, + {writes: []int{int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf)}, expect: 16 * commPBufPad}, + + {writes: []int{200, int(commPBuf)}, expect: 2 * commPBufPad}, + } { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + w := &commPWriter{} + var rawSum int64 + for _, write := range s.writes { + rawSum += int64(write) + _, err := w.Write(make([]byte, write)) + require.NoError(t, err) + } + + p, err := w.Sum() + require.NoError(t, err) + require.Equal(t, rawSum, p.PayloadSize) + require.Equal(t, s.expect, p.PieceSize) + require.Equal(t, zerocomm.ZeroPieceCommitment(s.expect.Unpadded()).String(), p.PieceCID.String()) + }) + } +} + +func TestClientDealPieceCIDData(t *testing.T) { + dataLen := float64(commPBuf) * 6.78 + data, _ := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(dataLen))) + + pr, sz := padreader.New(bytes.NewReader(data), uint64(dataLen)) + exp, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, pr, sz) + require.NoError(t, err) + + w := &commPWriter{} + _, err = io.Copy(w, bytes.NewReader(data)) + require.NoError(t, err) + + res, err := w.Sum() + require.NoError(t, err) + + require.Equal(t, exp.String(), res.PieceCID.String()) +} + +func BenchmarkClientDealPieceCIDZero(b *testing.B) { + buf := make([]byte, int(commPBuf)*b.N) + b.SetBytes(int64(commPBuf)) + b.ResetTimer() + + w := &commPWriter{} + + _, err := w.Write(buf) + require.NoError(b, err) + o, err := w.Sum() + + b.StopTimer() + + require.NoError(b, err) + require.Equal(b, zerocomm.ZeroPieceCommitment(o.PieceSize.Unpadded()).String(), o.PieceCID.String()) + require.Equal(b, int64(commPBuf)*int64(b.N), o.PayloadSize) +}