Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use new retrieval code path for indexers #1425

Merged
merged 4 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ const (
SettlePaymentChannelsKey
RunPeerTaggerKey
SetupFallbackBlockstoresKey
HandleSetLinkSystem

SetApiEndpointKey

Expand Down Expand Up @@ -524,6 +525,8 @@ func ConfigBoost(cfg *config.Boost) Option {
Override(new(dtypes.ProviderTransferNetwork), modules.NewProviderTransferNetwork),
Override(new(*modules.ProxyAskGetter), modules.NewAskGetter),
Override(new(server.AskGetter), From(new(*modules.ProxyAskGetter))),
Override(new(*modules.LinkSystemProv), modules.NewLinkSystemProvider),
Override(new(server.LinkSystemProvider), From(new(*modules.LinkSystemProv))),
Override(new(*server.GraphsyncUnpaidRetrieval), modules.RetrievalGraphsync(cfg.LotusDealmaking.SimultaneousTransfersForStorage, cfg.LotusDealmaking.SimultaneousTransfersForStoragePerClient, cfg.LotusDealmaking.SimultaneousTransfersForRetrieval)),
Override(new(dtypes.StagingGraphsync), From(new(*server.GraphsyncUnpaidRetrieval))),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Expand Down Expand Up @@ -575,6 +578,7 @@ func ConfigBoost(cfg *config.Boost) Option {
Override(HandleBoostDealsKey, modules.HandleBoostLibp2pDeals),
Override(HandleContractDealsKey, modules.HandleContractDeals(&cfg.ContractDeals)),
Override(HandleProposalLogCleanerKey, modules.HandleProposalLogCleaner(time.Duration(cfg.Dealmaking.DealProposalLogDuration))),
Override(HandleSetLinkSystem, modules.SetLinkSystem),

// Boost storage deal filter
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, nil)),
Expand Down
37 changes: 30 additions & 7 deletions node/modules/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package modules

import (
"context"
"sync"
"time"

"github.com/filecoin-project/boost-gfm/retrievalmarket"
retrievalimpl "github.com/filecoin-project/boost-gfm/retrievalmarket/impl"
"github.com/filecoin-project/boost-gfm/stores"
Expand All @@ -15,13 +18,16 @@ import (
"github.com/filecoin-project/lotus/metrics"
lotus_helpers "github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipld/go-ipld-prime"
provider "github.com/ipni/index-provider"
"github.com/ipni/index-provider/engine"
"github.com/libp2p/go-libp2p/core/host"
"go.opencensus.io/stats"
"go.uber.org/fx"
"sync"
"time"
)

var _ server.AskGetter = (*ProxyAskGetter)(nil)

// ProxyAskGetter is used to avoid circular dependencies:
// RetrievalProvider depends on RetrievalGraphsync, which depends on RetrievalProvider's
// GetAsk method.
Expand Down Expand Up @@ -49,9 +55,27 @@ func SetAskGetter(proxy *ProxyAskGetter, rp retrievalmarket.RetrievalProvider) {
proxy.AskGetter = rp
}

// LinkSystemProv is used to avoid circular dependencies
type LinkSystemProv struct {
*ipld.LinkSystem
}

func NewLinkSystemProvider() *LinkSystemProv {
return &LinkSystemProv{}
}

func (p *LinkSystemProv) LinkSys() *ipld.LinkSystem {
return p.LinkSystem
}

func SetLinkSystem(proxy *LinkSystemProv, prov provider.Interface) {
e := prov.(*engine.Engine)
proxy.LinkSystem = e.LinkSystem()
}

// RetrievalGraphsync creates a graphsync instance used to serve retrievals.
func RetrievalGraphsync(parallelTransfersForStorage uint64, parallelTransfersForStoragePerPeer uint64, parallelTransfersForRetrieval uint64) func(mctx lotus_helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.IndexBackedBlockstore, h host.Host, net dtypes.ProviderTransferNetwork, dealDecider dtypes.RetrievalDealFilter, dagStore stores.DAGStoreWrapper, pstore dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, askGetter server.AskGetter) (*server.GraphsyncUnpaidRetrieval, error) {
return func(mctx lotus_helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.IndexBackedBlockstore, h host.Host, net dtypes.ProviderTransferNetwork, dealDecider dtypes.RetrievalDealFilter, dagStore stores.DAGStoreWrapper, pstore dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, askGetter server.AskGetter) (*server.GraphsyncUnpaidRetrieval, error) {
func RetrievalGraphsync(parallelTransfersForStorage uint64, parallelTransfersForStoragePerPeer uint64, parallelTransfersForRetrieval uint64) func(mctx lotus_helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.IndexBackedBlockstore, h host.Host, net dtypes.ProviderTransferNetwork, dealDecider dtypes.RetrievalDealFilter, dagStore stores.DAGStoreWrapper, pstore dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, askGetter server.AskGetter, ls server.LinkSystemProvider) (*server.GraphsyncUnpaidRetrieval, error) {
return func(mctx lotus_helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.IndexBackedBlockstore, h host.Host, net dtypes.ProviderTransferNetwork, dealDecider dtypes.RetrievalDealFilter, dagStore stores.DAGStoreWrapper, pstore dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, askGetter server.AskGetter, ls server.LinkSystemProvider) (*server.GraphsyncUnpaidRetrieval, error) {
// Create a Graphsync instance
mkgs := Graphsync(parallelTransfersForStorage, parallelTransfersForStoragePerPeer, parallelTransfersForRetrieval)
gs := mkgs(mctx, lc, ibs, h)
Expand All @@ -64,14 +88,13 @@ func RetrievalGraphsync(parallelTransfersForStorage uint64, parallelTransfersFor
SectorAccessor: sa,
AskStore: askGetter,
}
gsupr, err := server.NewGraphsyncUnpaidRetrieval(h.ID(), gs, net, vdeps)
gsupr, err := server.NewGraphsyncUnpaidRetrieval(h.ID(), gs, net, vdeps, ls)

// Set up a context that is cancelled when the boostd process exits
gsctx, cancel := context.WithCancel(context.Background())
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
gsupr.Start(gsctx)
return nil
return gsupr.Start(gsctx)
},
OnStop: func(_ context.Context) error {
cancel()
Expand Down
28 changes: 6 additions & 22 deletions node/modules/storageminer_idxprov.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/filecoin-project/boost/build"
"github.com/filecoin-project/boost/indexprovider"
"github.com/filecoin-project/boost/node/modules/dtypes"
"github.com/filecoin-project/boost/retrievalmarket/types"
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/transport/graphsync"
Expand Down Expand Up @@ -150,24 +151,16 @@ type indexerDT struct {

var _ datatransferv2.Manager = (*indexerDT)(nil)

type legsVoucherDTv1 struct {
dtsync.Voucher
}

func (l *legsVoucherDTv1) Type() datatransfer.TypeIdentifier {
return datatransfer.TypeIdentifier(dtsync.LegsVoucherType)
}

func (i *indexerDT) RegisterVoucherType(voucherType datatransferv2.TypeIdentifier, validator datatransferv2.RequestValidator) error {
if voucherType == dtsync.LegsVoucherType {
return i.dt.RegisterVoucherType(&legsVoucherDTv1{}, &dtv1ReqValidator{v: validator})
return i.dt.RegisterVoucherType(&types.LegsVoucherDTv1{}, &dtv1ReqValidator{v: validator})
}
return fmt.Errorf("unrecognized voucher type: %s", voucherType)
}

func (i *indexerDT) RegisterTransportConfigurer(voucherType datatransferv2.TypeIdentifier, configurer datatransferv2.TransportConfigurer) error {
if voucherType == dtsync.LegsVoucherType {
return i.dt.RegisterTransportConfigurer(&legsVoucherDTv1{}, func(chid datatransfer.ChannelID, voucher datatransfer.Voucher, transport datatransfer.Transport) {
return i.dt.RegisterTransportConfigurer(&types.LegsVoucherDTv1{}, func(chid datatransfer.ChannelID, voucher datatransfer.Voucher, transport datatransfer.Transport) {
gsTransport, ok := transport.(*graphsync.Transport)
if ok {
err := gsTransport.UseStore(chid, i.linksys())
Expand Down Expand Up @@ -245,21 +238,12 @@ func (i *indexerDT) RestartDataTransferChannel(ctx context.Context, chid datatra
return fmt.Errorf("not implemented")
}

type dtv1VoucherResult struct {
voucherType datatransferv2.TypeIdentifier
dtsync.VoucherResult
}

func (d *dtv1VoucherResult) Type() datatransfer.TypeIdentifier {
return datatransfer.TypeIdentifier(d.voucherType)
}

type dtv1ReqValidator struct {
v datatransferv2.RequestValidator
}

func (d *dtv1ReqValidator) ValidatePush(isRestart bool, chid datatransfer.ChannelID, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
d2v := dtsync.BindnodeRegistry.TypeToNode(&voucher.(*legsVoucherDTv1).Voucher)
d2v := dtsync.BindnodeRegistry.TypeToNode(&voucher.(*types.LegsVoucherDTv1).Voucher)
res, err := d.v.ValidatePush(toChannelIDV2(chid), sender, d2v, baseCid, selector)
if err != nil {
return nil, err
Expand All @@ -272,7 +256,7 @@ func (d *dtv1ReqValidator) ValidatePush(isRestart bool, chid datatransfer.Channe
}

func (d *dtv1ReqValidator) ValidatePull(isRestart bool, chid datatransfer.ChannelID, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
d2v := dtsync.BindnodeRegistry.TypeToNode(&voucher.(*legsVoucherDTv1).Voucher)
d2v := dtsync.BindnodeRegistry.TypeToNode(&voucher.(*types.LegsVoucherDTv1).Voucher)
res, err := d.v.ValidatePull(toChannelIDV2(chid), receiver, d2v, baseCid, selector)
if err != nil {
return nil, err
Expand All @@ -294,7 +278,7 @@ func toVoucherResult(res datatransferv2.ValidationResult) (datatransfer.VoucherR
if vr == nil {
return nil, fmt.Errorf("got nil VoucherResult from ValidationResult")
}
return &dtv1VoucherResult{VoucherResult: *vr, voucherType: res.VoucherResult.Type}, nil
return &types.LegsVoucherResultDtv1{VoucherResult: *vr, VoucherType: res.VoucherResult.Type}, nil
}

func toChannelIDV2(chid datatransfer.ChannelID) datatransferv2.ChannelID {
Expand Down
10 changes: 8 additions & 2 deletions retrievalmarket/server/channelstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@ import (
cbg "github.com/whyrusleeping/cbor-gen"
)

type RetrievalType string

const RetrievalTypeDeal RetrievalType = "Deal"
const RetrievalTypeLegs RetrievalType = "Legs"

type retrievalState struct {
cs *channelState
mkts *retrievalmarket.ProviderDealState
retType RetrievalType
cs *channelState
mkts *retrievalmarket.ProviderDealState
}

func (r retrievalState) ChannelState() channelState { return *r.cs }
Expand Down
Loading