Skip to content

Commit

Permalink
Add s3 lens for working against s3 data (#222)
Browse files Browse the repository at this point in the history
* Add s3 lens for working against s3 data
* factor out common lens code
  • Loading branch information
willscott authored Nov 27, 2020
1 parent 7bd1d81 commit 8a77932
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 399 deletions.
3 changes: 3 additions & 0 deletions commands/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
carapi "github.com/filecoin-project/sentinel-visor/lens/carrepo"
vapi "github.com/filecoin-project/sentinel-visor/lens/lotus"
repoapi "github.com/filecoin-project/sentinel-visor/lens/lotusrepo"
s3api "github.com/filecoin-project/sentinel-visor/lens/s3repo"
sqlapi "github.com/filecoin-project/sentinel-visor/lens/sqlrepo"
"github.com/filecoin-project/sentinel-visor/metrics"
"github.com/filecoin-project/sentinel-visor/storage"
Expand Down Expand Up @@ -58,6 +59,8 @@ func SetupStorageAndAPI(cctx *cli.Context) (context.Context, *RunContext, error)
opener, closer, err = carapi.NewAPIOpener(cctx)
} else if cctx.String("lens") == "sql" {
opener, closer, err = sqlapi.NewAPIOpener(cctx)
} else if cctx.String("lens") == "s3" {
opener, closer, err = s3api.NewAPIOpener(cctx)
}
if err != nil {
return nil, nil, xerrors.Errorf("get node api: %w", err)
Expand Down
205 changes: 11 additions & 194 deletions lens/carrepo/carrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,17 @@ package carrepo
import (
"context"
"fmt"
"io"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/sentinel-visor/lens"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/urfave/cli/v2"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/lib/bufbstore"
"github.com/filecoin-project/lotus/lib/ulimit"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/sentinel-visor/lens/util"
"github.com/willscott/carbs"
)

type APIOpener struct {
// shared instance of the repo since the opener holds an exclusive lock on it
rapi *CarAPI
}

func NewAPIOpener(c *cli.Context) (*APIOpener, lens.APICloser, error) {
rapi := CarAPI{}

func NewAPIOpener(c *cli.Context) (lens.APIOpener, lens.APICloser, error) {
if _, _, err := ulimit.ManageFdLimit(); err != nil {
return nil, nil, fmt.Errorf("setting file descriptor limit: %s", err)
}
Expand All @@ -49,171 +22,15 @@ func NewAPIOpener(c *cli.Context) (*APIOpener, lens.APICloser, error) {
if err != nil {
return nil, nil, err
}
cacheDB := NewCachingStore(db)
cacheDB := util.NewCachingStore(db)

r := repo.NewMemory(nil)

lr, err := r.Lock(repo.FullNode)
if err != nil {
return nil, nil, err
h := func(ctx context.Context, lookback int) (*types.TipSetKey, error) {
c, err := db.Roots()
if err != nil {
return nil, err
}
tsk := types.NewTipSetKey(c...)
return &tsk, nil
}

mds, err := lr.Datastore("/metadata")
if err != nil {
return nil, nil, err
}

cs := store.NewChainStore(cacheDB, cacheDB, mds, vm.Syscalls(&fakeVerifier{}), journal.NilJournal())

headKey, err := db.Roots()
if err != nil {
return nil, nil, err
}

headTs, err := cs.LoadTipSet(types.NewTipSetKey(headKey...))
if err != nil {
return nil, nil, fmt.Errorf("failed to load our own chainhead: %w", err)
}
if err := cs.SetHead(headTs); err != nil {
return nil, nil, fmt.Errorf("failed to set our own chainhead: %w", err)
}

sm := stmgr.NewStateManager(cs)

rapi.FullNodeAPI.ChainAPI.Chain = cs
rapi.FullNodeAPI.ChainAPI.ChainModuleAPI = &full.ChainModule{Chain: cs}
rapi.FullNodeAPI.StateAPI.Chain = cs
rapi.FullNodeAPI.StateAPI.StateManager = sm
rapi.FullNodeAPI.StateAPI.StateModuleAPI = &full.StateModule{Chain: cs, StateManager: sm}

sf := func() {
lr.Close()
}

rapi.Context = c.Context
rapi.cacheSize = c.Int("lens-cache-hint")
return &APIOpener{&rapi}, sf, nil
}

func (o *APIOpener) Open(ctx context.Context) (lens.API, lens.APICloser, error) {
return o.rapi, lens.APICloser(func() {}), nil
}

type CarAPI struct {
impl.FullNodeAPI
context.Context
cacheSize int
}

func (ra *CarAPI) ComputeGasOutputs(gasUsed, gasLimit int64, baseFee, feeCap, gasPremium abi.TokenAmount) vm.GasOutputs {
return vm.ComputeGasOutputs(gasUsed, gasLimit, baseFee, feeCap, gasPremium)
}

func (ra *CarAPI) Store() adt.Store {
bs := ra.FullNodeAPI.ChainAPI.Chain.Blockstore()
bufferedStore := bufbstore.NewBufferedBstore(bs)
cs := cbor.NewCborStore(bufferedStore)
adtStore := adt.WrapStore(ra.Context, cs)
return adtStore
}

func (ra *CarAPI) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientGetDealUpdates(ctx context.Context) (<-chan api.DealInfo, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
return false, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) {
return api.QueryOffer{}, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error {
return fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) {
return cid.Undef, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientListImports(ctx context.Context) ([]api.Import, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
return fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, error) {
return api.DataSize{}, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error {
return fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
return nil, fmt.Errorf("unsupported")
}

func (ra *CarAPI) ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error {
return fmt.Errorf("unsupported")
}

// From https://github.com/ribasushi/ltsh/blob/5b0211033020570217b0ae37b50ee304566ac218/cmd/lotus-shed/deallifecycles.go#L41-L171
type fakeVerifier struct{}

var _ ffiwrapper.Verifier = (*fakeVerifier)(nil)

func (m fakeVerifier) VerifySeal(svi proof.SealVerifyInfo) (bool, error) {
return true, nil
}

func (m fakeVerifier) VerifyWinningPoSt(ctx context.Context, info proof.WinningPoStVerifyInfo) (bool, error) {
return true, nil
}

func (m fakeVerifier) VerifyWindowPoSt(ctx context.Context, info proof.WindowPoStVerifyInfo) (bool, error) {
return true, nil
}

func (m fakeVerifier) GenerateWinningPoStSectorChallenge(ctx context.Context, proof abi.RegisteredPoStProof, id abi.ActorID, randomness abi.PoStRandomness, u uint64) ([]uint64, error) {
panic("GenerateWinningPoStSectorChallenge not supported")
return util.NewAPIOpener(c, cacheDB, h)
}
16 changes: 16 additions & 0 deletions lens/s3repo/repo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package s3repo

import (
"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/lens/util"
"github.com/urfave/cli/v2"
)

func NewAPIOpener(c *cli.Context) (lens.APIOpener, lens.APICloser, error) {
bs, err := NewBlockStore(c.String("repo"))
if err != nil {
return nil, nil, err
}

return util.NewAPIOpener(c, bs, bs.(*S3Blockstore).getMasterTsKey)
}
119 changes: 119 additions & 0 deletions lens/s3repo/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package s3repo

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"strings"

logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/blockstore"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)

var log = logging.Logger("sql")

type S3Blockstore struct {
prefix string
client http.Client
}

func NewBlockStore(connstr string) (blockstore.Blockstore, error) {

sbs := &S3Blockstore{
prefix: connstr,
client: http.Client{},
}

// we do not currently use the Identity codec, but just in case...
return blockstore.WrapIDStore(sbs), nil
}

func (sbs *S3Blockstore) Has(c cid.Cid) (has bool, err error) {
resp, err := sbs.client.Head(sbs.prefix + c.String() + "/data.raw")
if err != nil {
return false, err
}
return resp.StatusCode == 200, nil
}

func (sbs *S3Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, fmt.Errorf("Not implemented")
}

func (sbs *S3Blockstore) Get(c cid.Cid) (blocks.Block, error) {
resp, err := sbs.client.Get(sbs.prefix + c.String() + "/data.raw")
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Failed to fetch: %v", resp.StatusCode)
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(buf, c)
}

func (sbs *S3Blockstore) GetSize(c cid.Cid) (size int, err error) {
resp, err := sbs.client.Head(sbs.prefix + c.String() + "/data.raw")
if err != nil {
return -1, err
}
if resp.StatusCode == 200 {
return int(resp.ContentLength), nil
}
return -1, fmt.Errorf("does not exist")
}

func (sbs *S3Blockstore) getMasterTsKey(ctx context.Context, lookback int) (*types.TipSetKey, error) {
resp, err := sbs.client.Get(sbs.prefix + "/head")
if err != nil {
return nil, err
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

cidStrs := strings.Split(string(buf), " ")
cids := make([]cid.Cid, len(cidStrs))
for _, cs := range cidStrs {
c, err := cid.Parse(cs)
if err != nil {
return nil, err
}
cids = append(cids, c)
}

tk := types.NewTipSetKey(cids...)
return &tk, nil
}

// BEGIN UNIMPLEMENTED

// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
func (sbs *S3Blockstore) HashOnRead(enabled bool) {
log.Warn("HashOnRead toggle not implemented, ignoring")
}

// Put puts a given block to the underlying datastore
func (sbs *S3Blockstore) Put(b blocks.Block) (err error) {
return fmt.Errorf("Not Implemented")
}

func (sbs *S3Blockstore) PutMany(blks []blocks.Block) error {
return fmt.Errorf("Not Implemented")
}

func (sbs *S3Blockstore) DeleteBlock(cid.Cid) error {
return fmt.Errorf("Not Implemented")
}
Loading

0 comments on commit 8a77932

Please sign in to comment.