Skip to content

Commit

Permalink
feat: unsealed from sp through venus-gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
diwufeiwen committed Mar 20, 2023
1 parent b2bde82 commit e4b0c54
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 46 deletions.
87 changes: 82 additions & 5 deletions dagstore/market_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"github.com/ipfs/go-cid"

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-padreader"

gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2"
vSharedTypes "github.com/filecoin-project/venus/venus-shared/types"

marketMetrics "github.com/filecoin-project/venus-market/v2/metrics"
"github.com/filecoin-project/venus-market/v2/models/repo"
"github.com/filecoin-project/venus-market/v2/piecestorage"
"github.com/filecoin-project/venus-market/v2/storageprovider"
"github.com/filecoin-project/venus-market/v2/utils"
)

Expand All @@ -35,17 +37,28 @@ type marketAPI struct {
useTransient bool
metricsCtx metrics.MetricsCtx
gatewayMarketClient gatewayAPIV2.IMarketClient

throttle throttle.Throttler
}

var _ MarketAPI = (*marketAPI)(nil)

func NewMarketAPI(ctx metrics.MetricsCtx, repo repo.Repo, pieceStorageMgr *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient, useTransient bool) MarketAPI {
func NewMarketAPI(
ctx metrics.MetricsCtx,
repo repo.Repo,
pieceStorageMgr *piecestorage.PieceStorageManager,
gatewayMarketClient gatewayAPIV2.IMarketClient,
useTransient bool,
concurrency int) MarketAPI {

return &marketAPI{
pieceRepo: repo.StorageDealRepo(),
pieceStorageMgr: pieceStorageMgr,
useTransient: useTransient,
metricsCtx: ctx,
gatewayMarketClient: gatewayMarketClient,

throttle: throttle.Fixed(concurrency),
}
}

Expand All @@ -56,11 +69,75 @@ func (m *marketAPI) Start(_ context.Context) error {
func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
_, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String())
if err != nil {
return false, fmt.Errorf("unable to find storage for piece %s %w", pieceCid, err)
log.Warnf("unable to find storage for piece %s: %s", pieceCid, err)

// check it from the SP through venus-gateway
deals, err := m.pieceRepo.GetDealsByPieceCidAndStatus(ctx, pieceCid, storageprovider.ReadyRetrievalDealStatus...)
if err != nil {
return false, fmt.Errorf("get delas for piece %s: %w", pieceCid, err)
}

if len(deals) == 0 {
return false, fmt.Errorf("no storage deals found for piece %s", pieceCid)
}

// check if we have an unsealed deal for the given piece in any of the unsealed sectors.
for _, deal := range deals {
deal := deal

var isUnsealed bool
// Throttle this path to avoid flooding the storage subsystem.
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
// todo ProofType can not be passed, SP processes itself?
isUnsealed, err = m.gatewayMarketClient.IsUnsealed(ctx, deal.Proposal.Provider, pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize)
if err != nil {
return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorNumber, deal.DealID, err)
}

if isUnsealed {
// send SectorsUnsealPiece task
wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize))
if err != nil {
return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err)
}

pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String())
if err != nil {
return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err)
}

return m.gatewayMarketClient.SectorsUnsealPiece(
ctx,
deal.Proposal.Provider,
pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize,
pieceTransfer,
)
}

return nil
})

if err != nil {
log.Warnf("failed to check/retrieve unsealed sector: %s", err)
continue // move on to the next match.
}

if isUnsealed {
return true, nil
}
}

// we don't have an unsealed sector containing the piece
return false, nil
}

return true, nil
// todo check isunseal from miner
// m.gatewayMarketClient.IsUnsealed()
}

func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) {
Expand Down
2 changes: 1 addition & 1 deletion dagstore/market_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMarket(t *testing.T) {
assert.Nil(t, err)

// todo: mock IMarketEvent
marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false)
marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false, 100)

size, err := marketAPI.GetUnpaddedCARSize(ctx, testResourceId)
assert.Nil(t, err)
Expand Down
9 changes: 8 additions & 1 deletion dagstore/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ const (

// CreateAndStartMarketAPI creates a new MarketAPI adaptor for the dagstore mounts.
func CreateAndStartMarketAPI(ctx metrics.MetricsCtx, lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient) (MarketAPI, error) {
mountApi := NewMarketAPI(ctx, repo, pieceStorage, gatewayMarketClient, r.UseTransient)
mountApi := NewMarketAPI(
ctx,
repo,
pieceStorage,
gatewayMarketClient,
r.UseTransient,
r.MaxConcurrencyStorageCalls,
)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return mountApi.Start(ctx)
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.10.2-0.20230316084941-2180049a244b
github.com/filecoin-project/venus v1.10.2-0.20230320061738-c105ca752826
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e
github.com/filecoin-project/venus-messager v1.10.2-0.20230309071456-7cd8d49c6e9a
github.com/golang/mock v1.6.0
Expand All @@ -40,7 +40,7 @@ require (
github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef
github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230309021833-84fdba83f37d
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356
github.com/ipfs/go-blockservice v0.4.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-cidutil v0.1.0
Expand Down Expand Up @@ -137,7 +137,6 @@ require (
github.com/filecoin-project/specs-actors/v4 v4.0.2 // indirect
github.com/filecoin-project/specs-actors/v5 v5.0.6 // indirect
github.com/filecoin-project/specs-actors/v6 v6.0.2 // indirect
github.com/filecoin-project/specs-storage v0.4.1 // indirect
github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psS
github.com/filecoin-project/go-state-types v0.1.1-0.20210810190654-139e0e79e69e/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-state-types v0.1.1/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-state-types v0.1.3/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-state-types v0.1.4/go.mod h1:xCA/WfKlC2zcn3fUmDv4IrzznwS98X5XW/irUP3Lhxg=
github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.1.8/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
Expand Down Expand Up @@ -457,14 +456,13 @@ github.com/filecoin-project/specs-actors/v7 v7.0.1/go.mod h1:tPLEYXoXhcpyLh69Ccq
github.com/filecoin-project/specs-actors/v8 v8.0.1 h1:4u0tIRJeT5G7F05lwLRIsDnsrN+bJ5Ixj6h49Q7uE2Y=
github.com/filecoin-project/specs-storage v0.2.2/go.mod h1:6cc/lncmAxMUocPi0z1EPCX63beIX7F7UnlmUZ3hLQo=
github.com/filecoin-project/specs-storage v0.4.1 h1:yvLEaLZj8f+uByhNC4mFOtCUyL2wQku+NGBp6hjTe9M=
github.com/filecoin-project/specs-storage v0.4.1/go.mod h1:Z2eK6uMwAOSLjek6+sy0jNV2DSsMEENziMUz0GHRFBw=
github.com/filecoin-project/storetheindex v0.3.5/go.mod h1:0r3d0kSpK63O6AvLr1CjAINLi+nWD49clzcnKV+GLpI=
github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 h1:6GCuzxLVHBzlz7y+FkbHh6n0UyoEGWqDwJKQPJoz7bE=
github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0=
github.com/filecoin-project/venus v1.10.2-0.20230316084941-2180049a244b h1:T0TIlKIFsOvOKDIADN4P7xBC+coJ7YlbUqift11TCIE=
github.com/filecoin-project/venus v1.10.2-0.20230316084941-2180049a244b/go.mod h1:d6XlyUBZd+SHydYimTTkUr3O5rjYOHQTsydI4Nxy6U8=
github.com/filecoin-project/venus v1.10.2-0.20230320061738-c105ca752826 h1:NnZjr537T102iDVdxm8e/5+t7gJmtfSkpay26KQS9LY=
github.com/filecoin-project/venus v1.10.2-0.20230320061738-c105ca752826/go.mod h1:d6XlyUBZd+SHydYimTTkUr3O5rjYOHQTsydI4Nxy6U8=
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e h1:Bxpt1AzPeNxmUnFT2Y8rpabr9x0wIC0Q87DeRmjL2co=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs=
Expand Down Expand Up @@ -827,8 +825,8 @@ github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go.
github.com/ipfs-force-community/venus-common-utils v0.0.0-20210924063144-1d3a5b30de87/go.mod h1:RTVEOzM+hkpqmcEWpyLDkx1oGO5r9ZWCgYxG/CsXzJQ=
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 h1:v/1/INcqm3kHLauWQYB63MwWJRWGz+3WEuUPp0jzIl8=
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7/go.mod h1:sSTUXgIu95tPHvgcYhdLuELmgPJWCP/pNMFtsrVtOyA=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230309021833-84fdba83f37d h1:/ajr1CjW48GP0vwZgONtUOF74nImn6p4dybeGd0UF98=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230309021833-84fdba83f37d/go.mod h1:+TOIuiXWzfS5au9pK5oyU8AzDn/aMJhzSXXlQkVwf5A=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356 h1:j+EdBUhTFZgQBoC+AQuucDlIGpdvRvjWZmtn3GIuMsU=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356/go.mod h1:J2VCU6ANymkl8MjjpHj+y2Wai7EGM1Htd6ImSD5Entw=
github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
Expand Down
38 changes: 29 additions & 9 deletions piecestorage/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@ package piecestorage

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"

"github.com/filecoin-project/dagstore/mount"

"github.com/filecoin-project/venus/pkg/util/fsutil"
"github.com/filecoin-project/venus/venus-shared/types/market"

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/venus-market/v2/config"
"github.com/filecoin-project/venus-market/v2/utils"
"github.com/filecoin-project/venus/pkg/util/fsutil"
)

type fsPieceStorage struct {
baseUrl string
fsCfg *config.FsPieceStorage
}

func (f *fsPieceStorage) Len(ctx context.Context, resourceId string) (int64, error) {
func (f *fsPieceStorage) Len(_ context.Context, resourceId string) (int64, error) {
st, err := os.Stat(path.Join(f.baseUrl, resourceId))
if err != nil {
return 0, err
Expand All @@ -33,7 +35,7 @@ func (f *fsPieceStorage) Len(ctx context.Context, resourceId string) (int64, err
return st.Size(), err
}

func (f *fsPieceStorage) ListResourceIds(ctx context.Context) ([]string, error) {
func (f *fsPieceStorage) ListResourceIds(_ context.Context) ([]string, error) {
entries, err := os.ReadDir(f.baseUrl)
if err != nil {
return nil, err
Expand All @@ -47,7 +49,7 @@ func (f *fsPieceStorage) ListResourceIds(ctx context.Context) ([]string, error)
return resources, nil
}

func (f *fsPieceStorage) SaveTo(ctx context.Context, resourceId string, r io.Reader) (int64, error) {
func (f *fsPieceStorage) SaveTo(_ context.Context, resourceId string, r io.Reader) (int64, error) {
if f.fsCfg.ReadOnly {
return 0, fmt.Errorf("do not write to a 'readonly' piece store")
}
Expand All @@ -67,7 +69,7 @@ func (f *fsPieceStorage) SaveTo(ctx context.Context, resourceId string, r io.Rea
return wlen, err
}

func (f *fsPieceStorage) GetReaderCloser(ctx context.Context, resourceId string) (io.ReadCloser, error) {
func (f *fsPieceStorage) GetReaderCloser(_ context.Context, resourceId string) (io.ReadCloser, error) {
dstPath := path.Join(f.baseUrl, resourceId)
fs, err := os.Open(dstPath)
if err != nil {
Expand All @@ -76,7 +78,7 @@ func (f *fsPieceStorage) GetReaderCloser(ctx context.Context, resourceId string)
return fs, nil
}

func (f *fsPieceStorage) GetMountReader(ctx context.Context, resourceId string) (mount.Reader, error) {
func (f *fsPieceStorage) GetMountReader(_ context.Context, resourceId string) (mount.Reader, error) {
dstPath := path.Join(f.baseUrl, resourceId)
fs, err := os.Open(dstPath)
if err != nil {
Expand All @@ -89,7 +91,25 @@ func (f *fsPieceStorage) GetRedirectUrl(_ context.Context, _ string) (string, er
return "", ErrUnsupportRedirect
}

func (f *fsPieceStorage) Has(ctx context.Context, resourceId string) (bool, error) {
func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (*market.Transfer, error) {
if f.fsCfg.ReadOnly {
return nil, fmt.Errorf("%s id readonly piece store", f.fsCfg.Name)
}

dstPath := path.Join(f.baseUrl, pieceCid)
transfer := market.FsTransfer{Path: dstPath}
params, err := json.Marshal(&transfer)
if err != nil {
return nil, fmt.Errorf("construct piece transfer: %w", err)
}

return &market.Transfer{
Type: market.PiecesTransferFs,
Params: params,
}, nil
}

func (f *fsPieceStorage) Has(_ context.Context, resourceId string) (bool, error) {
_, err := os.Stat(path.Join(f.baseUrl, resourceId))
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -100,7 +120,7 @@ func (f *fsPieceStorage) Has(ctx context.Context, resourceId string) (bool, erro
return true, nil
}

func (f *fsPieceStorage) Validate(resourceId string) error {
func (f *fsPieceStorage) Validate(_ string) error {
st, err := os.Stat(f.baseUrl)
if err != nil {
if os.IsNotExist(err) {
Expand Down
4 changes: 4 additions & 0 deletions piecestorage/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (m *MemPieceStore) GetRedirectUrl(_ context.Context, resourceId string) (st
return "", ErrUnsupportRedirect
}

func (m *MemPieceStore) GetPieceTransfer(context.Context, string) (*market.Transfer, error) {
return &market.Transfer{}, nil
}

func (m *MemPieceStore) Validate(s string) error {
return nil
}
Expand Down
Loading

0 comments on commit e4b0c54

Please sign in to comment.