Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix badger double open on daemon --import-snapshot; chainstore lifecycle #4872

Merged
merged 6 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain/store/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestIndexSeeks(t *testing.T) {

nbs := blockstore.NewTemporarySync()
cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
defer cs.Close() //nolint:errcheck

_, err = cs.Import(bytes.NewReader(gencar))
if err != nil {
Expand Down
29 changes: 21 additions & 8 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,24 +134,30 @@ type ChainStore struct {

evtTypes [1]journal.EventType
journal journal.Journal

cancelFn context.CancelFunc
wg sync.WaitGroup
}

// localbs is guaranteed to fail Get* if requested block isn't stored locally
func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
mmCache, _ := lru.NewARC(DefaultMsgMetaCacheSize)
tsCache, _ := lru.NewARC(DefaultTipSetCacheSize)
if j == nil {
j = journal.NilJournal()
}

ctx, cancel := context.WithCancel(context.Background())
cs := &ChainStore{
bs: bs,
localbs: localbs,
ds: ds,
bestTips: pubsub.New(64),
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
mmCache: c,
tsCache: tsc,
mmCache: mmCache,
tsCache: tsCache,
vmcalls: vmcalls,
cancelFn: cancel,
journal: j,
}

Expand Down Expand Up @@ -191,19 +197,24 @@ func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Ba
}

hcmetric := func(rev, app []*types.TipSet) error {
ctx := context.Background()
for _, r := range app {
stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height())))
stats.Record(context.Background(), metrics.ChainNodeHeight.M(int64(r.Height())))
}
return nil
}

cs.reorgNotifeeCh = make(chan ReorgNotifee)
cs.reorgCh = cs.reorgWorker(context.TODO(), []ReorgNotifee{hcnf, hcmetric})
cs.reorgCh = cs.reorgWorker(ctx, []ReorgNotifee{hcnf, hcmetric})

return cs
}

func (cs *ChainStore) Close() error {
cs.cancelFn()
cs.wg.Wait()
return nil
}

func (cs *ChainStore) Load() error {
head, err := cs.ds.Get(chainHeadKey)
if err == dstore.ErrNotFound {
Expand Down Expand Up @@ -383,7 +394,7 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS
// particular tipset to carry out a benchmark, verification, etc. on a chain
// segment.
func (cs *ChainStore) ForceHeadSilent(_ context.Context, ts *types.TipSet) error {
log.Warnf("(!!!) forcing a new head silently; only use this only for testing; new head: %s", ts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no valid use for this outside testing before. When importing snapshot it doesn't make sense to emit reorg notifications as nothing is listening on that chainstore, and everything that cares about chain state does check current state when subscribing

tldr this is fine

log.Warnf("(!!!) forcing a new head silently; new head: %s", ts)

cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
Expand All @@ -406,7 +417,9 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
notifees := make([]ReorgNotifee, len(initialNotifees))
copy(notifees, initialNotifees)

cs.wg.Add(1)
go func() {
defer cs.wg.Done()
defer log.Warn("reorgWorker quit")

for {
Expand Down
4 changes: 4 additions & 0 deletions chain/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func BenchmarkGetRandomness(b *testing.B) {
}

cs := store.NewChainStore(bs, bs, mds, nil, nil)
defer cs.Close() //nolint:errcheck

b.ResetTimer()

Expand Down Expand Up @@ -105,6 +106,7 @@ func TestChainExportImport(t *testing.T) {

nbs := blockstore.NewTemporary()
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
defer cs.Close() //nolint:errcheck

root, err := cs.Import(buf)
if err != nil {
Expand Down Expand Up @@ -139,6 +141,8 @@ func TestChainExportImportFull(t *testing.T) {

nbs := blockstore.NewTemporary()
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
defer cs.Close() //nolint:errcheck

root, err := cs.Import(buf)
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/lotus-bench/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ var importBenchCmd = &cli.Command{

metadataDs := datastore.NewMapDatastore()
cs := store.NewChainStore(bs, bs, metadataDs, vm.Syscalls(verifier), nil)
defer cs.Close() //nolint:errcheck

stm := stmgr.NewStateManager(cs)

startTime := time.Now()
Expand Down
2 changes: 2 additions & 0 deletions cmd/lotus-shed/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ var chainBalanceStateCmd = &cli.Command{
}

cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
defer cs.Close() //nolint:errcheck

cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
Expand Down Expand Up @@ -409,6 +410,7 @@ var chainPledgeCmd = &cli.Command{
}

cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
defer cs.Close() //nolint:errcheck

cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
Expand Down
2 changes: 2 additions & 0 deletions cmd/lotus-shed/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ var exportChainCmd = &cli.Command{
}

cs := store.NewChainStore(bs, bs, mds, nil, nil)
defer cs.Close() //nolint:errcheck

if err := cs.Load(); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/lotus-shed/genesis-verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var genesisVerifyCmd = &cli.Command{
bs := blockstore.NewBlockstore(datastore.NewMapDatastore())

cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), nil, nil)
defer cs.Close() //nolint:errcheck

cf := cctx.Args().Get(0)
f, err := os.Open(cf)
Expand Down
2 changes: 2 additions & 0 deletions cmd/lotus-shed/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ var stateTreePruneCmd = &cli.Command{
}

cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
defer cs.Close() //nolint:errcheck

if err := cs.Load(); err != nil {
return fmt.Errorf("loading chainstore: %w", err)
}
Expand Down
12 changes: 3 additions & 9 deletions cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,14 +410,6 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
return xerrors.Errorf("failed to open blockstore: %w", err)
}

defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()

mds, err := lr.Datastore("/metadata")
if err != nil {
return err
Expand All @@ -427,7 +419,9 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
if err != nil {
return xerrors.Errorf("failed to open journal: %w", err)
}

cst := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
defer cst.Close() //nolint:errcheck

log.Infof("importing chain from %s...", fname)

Expand Down Expand Up @@ -472,7 +466,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
}

log.Infof("accepting %s as new head", ts.Cids())
if err := cst.SetHead(ts); err != nil {
if err := cst.ForceHeadSilent(context.Background(), ts); err != nil {
return err
}

Expand Down
2 changes: 2 additions & 0 deletions conformance/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, preroot
sm = stmgr.NewStateManager(cs)
)

defer cs.Close() //nolint:errcheck

blocks := make([]store.BlockMessages, 0, len(tipset.Blocks))
for _, b := range tipset.Blocks {
sb := store.BlockMessages{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.5.0
github.com/ipfs/go-ipfs-blockstore v1.0.2
github.com/ipfs/go-ipfs-blockstore v1.0.3-0.20201116142306-a33814d3b08f
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,8 @@ github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86
github.com/ipfs/go-ipfs-blockstore v1.0.0/go.mod h1:knLVdhVU9L7CC4T+T4nvGdeUIPAXlnd9zmXfp+9MIjU=
github.com/ipfs/go-ipfs-blockstore v1.0.1 h1:fnuVj4XdZp4yExhd0CnUwAiMNJHiPnfInhiuwz4lW1w=
github.com/ipfs/go-ipfs-blockstore v1.0.1/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE=
github.com/ipfs/go-ipfs-blockstore v1.0.2 h1:Z8nUlBHK7wVKPKliQCQR9tLgUtz4J2QRbqFcJrqzM+E=
github.com/ipfs/go-ipfs-blockstore v1.0.2/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE=
github.com/ipfs/go-ipfs-blockstore v1.0.3-0.20201116142306-a33814d3b08f h1:AQQb5zZj7KKTEFh9EaAUXc5Q+F7SbYkjfYogZnEzfUc=
github.com/ipfs/go-ipfs-blockstore v1.0.3-0.20201116142306-a33814d3b08f/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE=
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk=
github.com/ipfs/go-ipfs-chunker v0.0.1/go.mod h1:tWewYK0we3+rMbOh7pPFGDyypCtvGcBFymgY4rSDLAw=
Expand Down
8 changes: 7 additions & 1 deletion node/modules/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,19 @@ func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap
return nil
}

func ChainStore(bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(bs, lbs, ds, syscalls, j)

if err := chain.Load(); err != nil {
log.Warnf("loading chain state from disk: %s", err)
}

lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return chain.Close()
},
})

return chain
}

Expand Down