Skip to content

Commit

Permalink
client deal: Cache CommD when creating multiple deals
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Oct 22, 2020
1 parent 8cad245 commit 924005a
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 2 deletions.
8 changes: 8 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ type FullNode interface {
ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error)
// ClientQueryAsk returns a signed StorageAsk from the specified miner.
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error)
// ClientCalcCommP calculates the CommP and data size of the specified CID
ClientDealPieceCID(ctx context.Context, root cid.Cid) (DataCIDSize, error)
// ClientCalcCommP calculates the CommP for a specified file
ClientCalcCommP(ctx context.Context, inpath string) (*CommPRet, error)
// ClientGenCar generates a CAR file for the specified file.
Expand Down Expand Up @@ -876,6 +878,12 @@ type DataSize struct {
PieceSize abi.PaddedPieceSize
}

type DataCIDSize struct {
PayloadSize int64
PieceSize abi.PaddedPieceSize
PieceCID cid.Cid
}

type CommPRet struct {
Root cid.Cid
Size abi.UnpaddedPieceSize
Expand Down
6 changes: 6 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type FullNodeStruct struct {
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"`
ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) `perm:"read"`
ClientDealPieceCID func(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) `perm:"read"`
ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"`
Expand Down Expand Up @@ -535,6 +536,11 @@ func (c *FullNodeStruct) ClientRetrieveWithEvents(ctx context.Context, order api
func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) {
return c.Internal.ClientQueryAsk(ctx, p, miner)
}

func (c *FullNodeStruct) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) {
return c.Internal.ClientDealPieceCID(ctx, root)
}

func (c *FullNodeStruct) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) {
return c.Internal.ClientCalcCommP(ctx, inpath)
}
Expand Down
7 changes: 5 additions & 2 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func interactiveDeal(cctx *cli.Context) error {
var dur time.Duration
var epochs abi.ChainEpoch
var verified bool
var ds lapi.DataSize
var ds lapi.DataCIDSize

// find
var candidateAsks []*storagemarket.StorageAsk
Expand Down Expand Up @@ -553,7 +553,7 @@ uiLoop:
}

color.Blue(".. calculating data size\n")
ds, err = api.ClientDealSize(ctx, data)
ds, err = api.ClientDealPieceCID(ctx, data)
if err != nil {
return err
}
Expand Down Expand Up @@ -843,6 +843,9 @@ uiLoop:
Data: &storagemarket.DataRef{
TransferType: storagemarket.TTGraphsync,
Root: data,

PieceCid: &ds.PieceCID,
PieceSize: ds.PieceSize.Unpadded(),
},
Wallet: a,
Miner: maddr,
Expand Down
119 changes: 119 additions & 0 deletions node/impl/client/client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package client

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"math/bits"
"os"

"github.com/filecoin-project/go-state-types/dline"
Expand Down Expand Up @@ -31,6 +34,7 @@ import (
mh "github.com/multiformats/go-multihash"
"go.uber.org/fx"

ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/discovery"
Expand All @@ -44,6 +48,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
marketevents "github.com/filecoin-project/lotus/markets/loggers"

"github.com/filecoin-project/lotus/api"
Expand Down Expand Up @@ -709,6 +714,120 @@ func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, e
}, nil
}

const commPBufPad = abi.PaddedPieceSize(8 << 20)
const commPBuf = abi.UnpaddedPieceSize(commPBufPad - (commPBufPad / 128)) // can't use .Unpadded() for const

type commPWriter struct {
len int64
buf [commPBuf]byte
leaves []cid.Cid
}

func (w *commPWriter) Write(p []byte) (int, error) {
n := len(p)
for len(p) > 0 {
buffered := int(w.len % int64(len(w.buf)))
toBuffer := len(w.buf) - buffered
if toBuffer > len(p) {
toBuffer = len(p)
}

copied := copy(w.buf[buffered:], p[:toBuffer])
p = p[copied:]
w.len += int64(copied)

if copied > 0 && w.len%int64(len(w.buf)) == 0 {
leaf, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, bytes.NewReader(w.buf[:]), commPBuf)
if err != nil {
return 0, err
}
w.leaves = append(w.leaves, leaf)
}
}
return n, nil
}

func (w *commPWriter) Sum() (api.DataCIDSize, error) {
// process last non-zero leaf if exists
lastLen := w.len % int64(len(w.buf))
rawLen := w.len

// process remaining bit of data
if lastLen != 0 {
if len(w.leaves) != 0 {
copy(w.buf[lastLen:], make([]byte, int(int64(commPBuf)-lastLen)))
lastLen = int64(commPBuf)
}

r, sz := padreader.New(bytes.NewReader(w.buf[:lastLen]), uint64(lastLen))
p, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, r, sz)
if err != nil {
return api.DataCIDSize{}, err
}

if sz < commPBuf { // special case for pieces smaller than 16MiB
return api.DataCIDSize{
PayloadSize: w.len,
PieceSize: sz.Padded(),
PieceCID: p,
}, nil
}

w.leaves = append(w.leaves, p)
}

// pad with zero pieces to power-of-two size
fillerLeaves := (1 << (bits.Len(uint(len(w.leaves) - 1)))) - len(w.leaves)
for i := 0; i < fillerLeaves; i++ {
w.leaves = append(w.leaves, zerocomm.ZeroPieceCommitment(commPBuf))
}

if len(w.leaves) == 1 {
return api.DataCIDSize{
PayloadSize: rawLen,
PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad,
PieceCID: w.leaves[0],
}, nil
}

pieces := make([]abi.PieceInfo, len(w.leaves))
for i, leaf := range w.leaves {
pieces[i] = abi.PieceInfo{
Size: commPBufPad,
PieceCID: leaf,
}
}

p, err := ffi.GenerateUnsealedCID(abi.RegisteredSealProof_StackedDrg32GiBV1, pieces)
if err != nil {
return api.DataCIDSize{}, xerrors.Errorf("generating unsealed CID: %w", err)
}

return api.DataCIDSize{
PayloadSize: rawLen,
PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad,
PieceCID: p,
}, nil
}

func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) {
dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore)))

w := &commPWriter{}
bw := bufio.NewWriterSize(w, int(commPBuf))

err := car.WriteCar(ctx, dag, []cid.Cid{root}, w)
if err != nil {
return api.DataCIDSize{}, err
}

if err := bw.Flush(); err != nil {
return api.DataCIDSize{}, err
}

return w.Sum()
}

func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error {
id, st, err := a.imgr().NewStore()
if err != nil {
Expand Down
87 changes: 87 additions & 0 deletions node/impl/client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package client

import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"testing"

"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
)

func TestClientDealPieceCIDZero(t *testing.T) {
for i, s := range []struct {
writes []int
expect abi.PaddedPieceSize
}{
{writes: []int{200}, expect: 256},
{writes: []int{200, 200}, expect: 512},

{writes: []int{int(commPBuf)}, expect: commPBufPad},
{writes: []int{int(commPBuf) * 2}, expect: 2 * commPBufPad},
{writes: []int{int(commPBuf), int(commPBuf), int(commPBuf)}, expect: 4 * commPBufPad},
{writes: []int{int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf)}, expect: 16 * commPBufPad},

{writes: []int{200, int(commPBuf)}, expect: 2 * commPBufPad},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
w := &commPWriter{}
var rawSum int64
for _, write := range s.writes {
rawSum += int64(write)
_, err := w.Write(make([]byte, write))
require.NoError(t, err)
}

p, err := w.Sum()
require.NoError(t, err)
require.Equal(t, rawSum, p.PayloadSize)
require.Equal(t, s.expect, p.PieceSize)
require.Equal(t, zerocomm.ZeroPieceCommitment(s.expect.Unpadded()).String(), p.PieceCID.String())
})
}
}

func TestClientDealPieceCIDData(t *testing.T) {
dataLen := float64(commPBuf) * 6.78
data, _ := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(dataLen)))

pr, sz := padreader.New(bytes.NewReader(data), uint64(dataLen))
exp, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, pr, sz)
require.NoError(t, err)

w := &commPWriter{}
_, err = io.Copy(w, bytes.NewReader(data))
require.NoError(t, err)

res, err := w.Sum()
require.NoError(t, err)

require.Equal(t, exp.String(), res.PieceCID.String())
}

func BenchmarkClientDealPieceCIDZero(b *testing.B) {
buf := make([]byte, int(commPBuf)*b.N)
b.SetBytes(int64(commPBuf))
b.ResetTimer()

w := &commPWriter{}

_, err := w.Write(buf)
require.NoError(b, err)
o, err := w.Sum()

b.StopTimer()

require.NoError(b, err)
require.Equal(b, zerocomm.ZeroPieceCommitment(o.PieceSize.Unpadded()).String(), o.PieceCID.String())
require.Equal(b, int64(commPBuf)*int64(b.N), o.PayloadSize)
}

0 comments on commit 924005a

Please sign in to comment.