diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 01c53f61e20..dee6e784cdd 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -98,6 +98,10 @@ type Config struct { // and directly purges cold blocks. DiscardColdBlocks bool + // UniversalColdBlocks indicates whether all blocks being garbage collected and purged + // from the hotstore should be written to the cold store + UniversalColdBlocks bool + // HotstoreMessageRetention indicates the hotstore retention policy for messages. // It has the following semantics: // - a value of 0 will only retain messages within the compaction boundary (4 finalities) @@ -111,21 +115,6 @@ type Config struct { // A positive value is the number of compactions before a full GC is performed; // a value of 1 will perform full GC in every compaction. HotStoreFullGCFrequency uint64 - - // EnableColdStoreAutoPrune turns on compaction of the cold store i.e. pruning - // where hotstore compaction occurs every finality epochs pruning happens every 3 finalities - // Default is false - EnableColdStoreAutoPrune bool - - // ColdStoreFullGCFrequency specifies how often to performa a full (moving) GC on the coldstore. - // Only applies if auto prune is enabled. A value of 0 disables while a value of 1 will do - // full GC in every prune. - // Default is 7 (about once every a week) - ColdStoreFullGCFrequency uint64 - - // ColdStoreRetention specifies the retention policy for data reachable from the chain, in - // finalities beyond the compaction boundary, default is 0, -1 retains everything - ColdStoreRetention int64 } // ChainAccessor allows the Splitstore to access the chain. It will most likely diff --git a/blockstore/splitstore/splitstore_check.go b/blockstore/splitstore/splitstore_check.go index 6452b3ee2f6..33651598020 100644 --- a/blockstore/splitstore/splitstore_check.go +++ b/blockstore/splitstore/splitstore_check.go @@ -125,7 +125,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { } return nil - }) + }, func(cid.Cid) error { return nil }) if err != nil { err = xerrors.Errorf("error walking chain: %w", err) diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index 272d0afabe1..1c4c903ffd8 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -20,7 +20,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" - bstore "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" @@ -134,39 +133,6 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { log.Infow("compaction done", "took", time.Since(start)) }() // only prune if auto prune is enabled and after at least one compaction - } else if s.cfg.EnableColdStoreAutoPrune && epoch-s.pruneEpoch > PruneThreshold && s.compactionIndex > 0 { - s.beginTxnProtect() - s.compactType = cold - go func() { - defer atomic.StoreInt32(&s.compacting, 0) - defer s.endTxnProtect() - - log.Info("pruning splitstore") - start := time.Now() - - var retainP func(int64) bool - switch { - case s.cfg.ColdStoreRetention > int64(0): - retainP = func(depth int64) bool { - return depth <= int64(CompactionBoundary)+s.cfg.ColdStoreRetention*int64(build.Finality) - } - case s.cfg.ColdStoreRetention < 0: - retainP = func(_ int64) bool { return true } - default: - retainP = func(depth int64) bool { - return depth <= int64(CompactionBoundary) - } - } - movingGC := s.cfg.ColdStoreFullGCFrequency > 0 && s.pruneIndex%int64(s.cfg.ColdStoreFullGCFrequency) == 0 - var gcOpts []bstore.BlockstoreGCOption - if movingGC { - gcOpts = append(gcOpts, bstore.WithFullGC(true)) - } - doGC := func() error { return s.gcBlockstore(s.cold, gcOpts) } - - s.prune(curTs, retainP, doGC) - log.Infow("prune done", "took", time.Since(start)) - }() } else { // no compaction necessary atomic.StoreInt32(&s.compacting, 0) @@ -562,6 +528,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { defer markSet.Close() //nolint:errcheck defer s.debug.Flush() + coldSet, err := s.markSetEnv.New("cold", s.markSetSize) + if err != nil { + return xerrors.Errorf("error creating cold mark set: %w", err) + } + defer coldSet.Close() //nolint:errcheck + if err := s.checkClosing(); err != nil { return err } @@ -580,24 +552,52 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { startMark := time.Now() count := new(int64) - err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, - func(c cid.Cid) error { - if isUnitaryObject(c) { - return errStopWalk - } - visit, err := markSet.Visit(c) - if err != nil { - return xerrors.Errorf("error visiting object: %w", err) - } + coldCount := new(int64) + fCold := func(c cid.Cid) error { + // Writes to cold set optimized away in universal and discard mode + // + // Nothing gets written to cold store in discard mode so no cold objects to write + // Everything not marked hot gets written to cold store in universal mode so no need to track cold objects separately + if s.cfg.DiscardColdBlocks || s.cfg.UniversalColdBlocks { + return nil + } - if !visit { - return errStopWalk - } + if isUnitaryObject(c) { + return errStopWalk + } - atomic.AddInt64(count, 1) - return nil - }) + visit, err := coldSet.Visit(c) + if err != nil { + return xerrors.Errorf("error visiting object: %w", err) + } + + if !visit { + return errStopWalk + } + + atomic.AddInt64(coldCount, 1) + return nil + } + fHot := func(c cid.Cid) error { + if isUnitaryObject(c) { + return errStopWalk + } + + visit, err := markSet.Visit(c) + if err != nil { + return xerrors.Errorf("error visiting object: %w", err) + } + + if !visit { + return errStopWalk + } + + atomic.AddInt64(count, 1) + return nil + } + + err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, fHot, fCold) if err != nil { return xerrors.Errorf("error marking: %w", err) @@ -631,8 +631,14 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } defer coldw.Close() //nolint:errcheck + purgew, err := NewColdSetWriter(s.discardSetPath()) + if err != nil { + return xerrors.Errorf("error creating deadset: %w", err) + } + defer purgew.Close() //nolint:errcheck + // some stats for logging - var hotCnt, coldCnt int + var hotCnt, coldCnt, purgeCnt int err = s.hot.ForEachKey(func(c cid.Cid) error { // was it marked? mark, err := markSet.Has(c) @@ -645,9 +651,27 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return nil } - // it's cold, mark it as candidate for move + // it needs to be removed from hot store, mark it as candidate for purge + if err := purgew.Write(c); err != nil { + return xerrors.Errorf("error writing cid to purge set: %w", err) + } + purgeCnt++ + + coldMark, err := coldSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking cold mark set for %s: %w", c, err) + } + + // Discard mode: coldMark == false, s.cfg.UniversalColdBlocks == false, always return here, no writes to cold store + // Universal mode: coldMark == false, s.cfg.UniversalColdBlocks == true, never stop here, all writes to cold store + // Otherwise: s.cfg.UniversalColdBlocks == false, if !coldMark stop here and don't write to cold store, if coldMark continue and write to cold store + if !coldMark && !s.cfg.UniversalColdBlocks { // universal mode means mark everything as cold + return nil + } + + // it's cold, mark as candidate for move if err := coldw.Write(c); err != nil { - return xerrors.Errorf("error writing cid to coldstore: %w", err) + return xerrors.Errorf("error writing cid to cold set") } coldCnt++ @@ -656,7 +680,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { if err != nil { return xerrors.Errorf("error collecting cold objects: %w", err) } - + if err := purgew.Close(); err != nil { + return xerrors.Errorf("erroring closing purgeset: %w", err) + } if err := coldw.Close(); err != nil { return xerrors.Errorf("error closing coldset: %w", err) } @@ -705,6 +731,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } } + purger, err := NewColdSetReader(s.discardSetPath()) + if err != nil { + return xerrors.Errorf("error opening coldset: %w", err) + } + defer purger.Close() //nolint:errcheck + // 4. Purge cold objects with checkpointing for recovery. // This is the critical section of compaction, whereby any cold object not in the markSet is // considered already deleted. @@ -736,7 +768,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // 5. purge cold objects from the hotstore, taking protected references into account log.Info("purging cold objects from the hotstore") startPurge := time.Now() - err = s.purge(coldr, checkpoint, markSet) + err = s.purge(purger, checkpoint, markSet) if err != nil { return xerrors.Errorf("error purging cold objects: %w", err) } @@ -864,7 +896,7 @@ func (s *SplitStore) endCriticalSection() { } func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch, - visitor ObjectVisitor, f func(cid.Cid) error) error { + visitor ObjectVisitor, fHot, fCold func(cid.Cid) error) error { var walked ObjectVisitor var mx sync.Mutex // we copy the tipset first into a new slice, which allows us to reuse it in every epoch. @@ -886,7 +918,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp atomic.AddInt64(walkCnt, 1) - if err := f(c); err != nil { + if err := fHot(c); err != nil { return err } @@ -904,27 +936,37 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp if inclMsgs < inclState { // we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we // synced from snapshot and have a long HotStoreMessageRetentionPolicy. - if err := s.walkObjectIncomplete(hdr.Messages, visitor, f, stopWalk); err != nil { + if err := s.walkObjectIncomplete(hdr.Messages, visitor, fHot, stopWalk); err != nil { return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) } - if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, f, stopWalk); err != nil { + if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fHot, stopWalk); err != nil { return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) } } else { - if err := s.walkObject(hdr.Messages, visitor, f); err != nil { + if err := s.walkObject(hdr.Messages, visitor, fHot); err != nil { return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) } - if err := s.walkObject(hdr.ParentMessageReceipts, visitor, f); err != nil { + if err := s.walkObject(hdr.ParentMessageReceipts, visitor, fHot); err != nil { return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) } } } + // messages and receipts outside of inclMsgs are included in the cold store + if hdr.Height < inclMsgs && hdr.Height > 0 { + if err := s.walkObjectIncomplete(hdr.Messages, visitor, fCold, stopWalk); err != nil { + return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) + } + if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fCold, stopWalk); err != nil { + return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) + } + } + // state is only retained if within the inclState boundary, with the exception of genesis if hdr.Height >= inclState || hdr.Height == 0 { - if err := s.walkObject(hdr.ParentStateRoot, visitor, f); err != nil { + if err := s.walkObject(hdr.ParentStateRoot, visitor, fHot); err != nil { return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err) } atomic.AddInt64(scanCnt, 1) @@ -1296,7 +1338,7 @@ func (s *SplitStore) coldSetPath() string { return filepath.Join(s.path, "coldset") } -func (s *SplitStore) deadSetPath() string { +func (s *SplitStore) discardSetPath() string { return filepath.Join(s.path, "deadset") } diff --git a/blockstore/splitstore/splitstore_prune.go b/blockstore/splitstore/splitstore_prune.go index 7d54d8e4ebf..6a26c00d271 100644 --- a/blockstore/splitstore/splitstore_prune.go +++ b/blockstore/splitstore/splitstore_prune.go @@ -208,7 +208,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, log.Info("collecting dead objects") startCollect := time.Now() - deadw, err := NewColdSetWriter(s.deadSetPath()) + deadw, err := NewColdSetWriter(s.discardSetPath()) if err != nil { return xerrors.Errorf("error creating coldset: %w", err) } @@ -267,7 +267,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, return err } - deadr, err := NewColdSetReader(s.deadSetPath()) + deadr, err := NewColdSetReader(s.discardSetPath()) if err != nil { return xerrors.Errorf("error opening deadset: %w", err) } @@ -311,10 +311,10 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, log.Warnf("error removing checkpoint: %s", err) } if err := deadr.Close(); err != nil { - log.Warnf("error closing deadset: %s", err) + log.Warnf("error closing discard set: %s", err) } - if err := os.Remove(s.deadSetPath()); err != nil { - log.Warnf("error removing deadset: %s", err) + if err := os.Remove(s.discardSetPath()); err != nil { + log.Warnf("error removing discard set: %s", err) } // we are done; do some housekeeping @@ -344,7 +344,7 @@ func (s *SplitStore) completePrune() error { } defer checkpoint.Close() //nolint:errcheck - deadr, err := NewColdSetReader(s.deadSetPath()) + deadr, err := NewColdSetReader(s.discardSetPath()) if err != nil { return xerrors.Errorf("error opening deadset: %w", err) } @@ -378,7 +378,7 @@ func (s *SplitStore) completePrune() error { if err := deadr.Close(); err != nil { log.Warnf("error closing deadset: %s", err) } - if err := os.Remove(s.deadSetPath()); err != nil { + if err := os.Remove(s.discardSetPath()); err != nil { log.Warnf("error removing deadset: %s", err) } diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index d23a78cf08b..750a2efeda5 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -38,6 +38,7 @@ func init() { func testSplitStore(t *testing.T, cfg *Config) { ctx := context.Background() chain := &mockChain{t: t} + fmt.Printf("Config: %v\n", cfg) // the myriads of stores ds := dssync.MutexWrap(datastore.NewMapDatastore()) @@ -225,7 +226,7 @@ func TestSplitStoreCompaction(t *testing.T) { //stm: @SPLITSTORE_SPLITSTORE_OPEN_001, @SPLITSTORE_SPLITSTORE_CLOSE_001 //stm: @SPLITSTORE_SPLITSTORE_PUT_001, @SPLITSTORE_SPLITSTORE_ADD_PROTECTOR_001 //stm: @SPLITSTORE_SPLITSTORE_CLOSE_001 - testSplitStore(t, &Config{MarkSetType: "map"}) + testSplitStore(t, &Config{MarkSetType: "map", UniversalColdBlocks: true}) } func TestSplitStoreCompactionWithBadger(t *testing.T) { @@ -237,7 +238,7 @@ func TestSplitStoreCompactionWithBadger(t *testing.T) { t.Cleanup(func() { badgerMarkSetBatchSize = bs }) - testSplitStore(t, &Config{MarkSetType: "badger"}) + testSplitStore(t, &Config{MarkSetType: "badger", UniversalColdBlocks: true}) } func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) { @@ -283,7 +284,7 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) { path := t.TempDir() // open the splitstore - ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"}) + ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true}) if err != nil { t.Fatal(err) } @@ -422,7 +423,7 @@ func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore. path := t.TempDir() - ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"}) + ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true}) if err != nil { t.Fatal(err) } @@ -522,7 +523,7 @@ func testSplitStoreReificationLimit(t *testing.T, f func(context.Context, blocks path := t.TempDir() - ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"}) + ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true}) if err != nil { t.Fatal(err) } diff --git a/blockstore/splitstore/splitstore_warmup.go b/blockstore/splitstore/splitstore_warmup.go index e1a1211257b..e387263dae7 100644 --- a/blockstore/splitstore/splitstore_warmup.go +++ b/blockstore/splitstore/splitstore_warmup.go @@ -110,7 +110,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { mx.Unlock() return nil - }) + }, func(cid.Cid) error { return nil }) if err != nil { return err diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index 27162a6ae08..bfb1aa2280e 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -166,11 +166,11 @@ [Chainstore.Splitstore] # ColdStoreType specifies the type of the coldstore. - # It can be "universal" (default) or "discard" for discarding cold blocks. + # It can be "messages" (default) to store only messages, "universal" to store all chain state or "discard" for discarding cold blocks. # # type: string # env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTORETYPE - #ColdStoreType = "universal" + #ColdStoreType = "messages" # HotStoreType specifies the type of the hotstore. # Only currently supported value is "badger". @@ -201,28 +201,4 @@ # env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREFULLGCFREQUENCY #HotStoreFullGCFrequency = 20 - # EnableColdStoreAutoPrune turns on compaction of the cold store i.e. pruning - # where hotstore compaction occurs every finality epochs pruning happens every 3 finalities - # Default is false - # - # type: bool - # env var: LOTUS_CHAINSTORE_SPLITSTORE_ENABLECOLDSTOREAUTOPRUNE - #EnableColdStoreAutoPrune = false - - # ColdStoreFullGCFrequency specifies how often to performa a full (moving) GC on the coldstore. - # Only applies if auto prune is enabled. A value of 0 disables while a value of 1 will do - # full GC in every prune. - # Default is 7 (about once every a week) - # - # type: uint64 - # env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTOREFULLGCFREQUENCY - #ColdStoreFullGCFrequency = 7 - - # ColdStoreRetention specifies the retention policy for data reachable from the chain, in - # finalities beyond the compaction boundary, default is 0, -1 retains everything - # - # type: int64 - # env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTORERETENTION - #ColdStoreRetention = 0 - diff --git a/itests/kit/node_opts.go b/itests/kit/node_opts.go index 2849485a93d..9c482700c26 100644 --- a/itests/kit/node_opts.go +++ b/itests/kit/node_opts.go @@ -256,9 +256,6 @@ type CfgOption func(cfg *config.FullNode) error func SplitstoreDiscard() NodeOpt { return WithCfgOpt(func(cfg *config.FullNode) error { - //cfg.Chainstore.Splitstore.HotStoreType = "badger" // default - //cfg.Chainstore.Splitstore.MarkSetType = "badger" // default - //cfg.Chainstore.Splitstore.HotStoreMessageRetention = 0 // default cfg.Chainstore.EnableSplitstore = true cfg.Chainstore.Splitstore.HotStoreFullGCFrequency = 0 // turn off full gc cfg.Chainstore.Splitstore.ColdStoreType = "discard" // no cold store @@ -268,9 +265,6 @@ func SplitstoreDiscard() NodeOpt { func SplitstoreUniversal() NodeOpt { return WithCfgOpt(func(cfg *config.FullNode) error { - //cfg.Chainstore.Splitstore.HotStoreType = "badger" // default - //cfg.Chainstore.Splitstore.MarkSetType = "badger" // default - //cfg.Chainstore.Splitstore.HotStoreMessageRetention = 0 // default cfg.Chainstore.EnableSplitstore = true cfg.Chainstore.Splitstore.HotStoreFullGCFrequency = 0 // turn off full gc cfg.Chainstore.Splitstore.ColdStoreType = "universal" // universal bs is coldstore @@ -278,10 +272,11 @@ func SplitstoreUniversal() NodeOpt { }) } -func SplitstoreAutoPrune() NodeOpt { +func SplitstoreMessges() NodeOpt { return WithCfgOpt(func(cfg *config.FullNode) error { - cfg.Chainstore.Splitstore.EnableColdStoreAutoPrune = true // turn on - cfg.Chainstore.Splitstore.ColdStoreFullGCFrequency = 0 // turn off full gc + cfg.Chainstore.EnableSplitstore = true + cfg.Chainstore.Splitstore.HotStoreFullGCFrequency = 0 // turn off full gc + cfg.Chainstore.Splitstore.ColdStoreType = "messages" // universal bs is coldstore, and it accepts messages return nil }) } diff --git a/itests/splitstore_test.go b/itests/splitstore_test.go index db74e86a9e7..957efe32fdf 100644 --- a/itests/splitstore_test.go +++ b/itests/splitstore_test.go @@ -63,7 +63,16 @@ func TestHotstoreCompactCleansGarbage(t *testing.T) { // create garbage g := NewGarbager(ctx, t, full) - garbage, e := g.Drop(ctx) + // state + garbageS, eS := g.Drop(ctx) + // message + garbageM, eM := g.Message(ctx) + e := eM + if eS > eM { + e = eS + } + assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore") + assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore") // calculate next compaction where we should actually see cleanup @@ -94,7 +103,8 @@ func TestHotstoreCompactCleansGarbage(t *testing.T) { waitForCompaction(ctx, t, garbageCompactionIndex, full) // check that garbage is cleaned up - assert.False(t, g.Exists(ctx, garbage), "Garbage still exists in blockstore") + assert.False(t, g.Exists(ctx, garbageS), "Garbage state still exists in blockstore") + assert.False(t, g.Exists(ctx, garbageM), "Garbage message still exists in blockstore") } // Create unreachable state @@ -112,8 +122,16 @@ func TestColdStorePrune(t *testing.T) { // create garbage g := NewGarbager(ctx, t, full) - garbage, e := g.Drop(ctx) - assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore") + // state + garbageS, eS := g.Drop(ctx) + // message + garbageM, eM := g.Message(ctx) + e := eM + if eS > eM { + e = eS + } + assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore") + assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore") // calculate next compaction where we should actually see cleanup @@ -148,7 +166,8 @@ func TestColdStorePrune(t *testing.T) { // This data should now be moved to the coldstore. // Access it without hotview to keep it there while checking that it still exists // Only state compute uses hot view so garbager Exists backed by ChainReadObj is all good - assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore") + assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore") + assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore") bm.Restart() // wait for compaction to finsih and pause to make sure it doesn't start to avoid racing @@ -165,14 +184,15 @@ func TestColdStorePrune(t *testing.T) { require.NoError(t, full.ChainPrune(ctx, pruneOpts)) bm.Restart() waitForPrune(ctx, t, 1, full) - assert.False(g.t, g.Exists(ctx, garbage), "Garbage should be removed from cold store after prune but it's still there") + assert.False(g.t, g.Exists(ctx, garbageS), "Garbage state should be removed from cold store after prune but it's still there") + assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message should be on the cold store after prune") } -func TestAutoPrune(t *testing.T) { +func TestMessagesMode(t *testing.T) { ctx := context.Background() // disable sync checking because efficient itests require that the node is out of sync : / splitstore.CheckSyncGap = false - opts := []interface{}{kit.MockProofs(), kit.SplitstoreUniversal(), kit.SplitstoreAutoPrune(), kit.FsRepo()} + opts := []interface{}{kit.MockProofs(), kit.SplitstoreMessges(), kit.FsRepo()} full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...) bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0] _ = full @@ -180,8 +200,16 @@ func TestAutoPrune(t *testing.T) { // create garbage g := NewGarbager(ctx, t, full) - garbage, e := g.Drop(ctx) - assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore") + // state + garbageS, eS := g.Drop(ctx) + // message + garbageM, eM := g.Message(ctx) + e := eM + if eS > eM { + e = eS + } + assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore") + assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore") // calculate next compaction where we should actually see cleanup @@ -213,13 +241,12 @@ func TestAutoPrune(t *testing.T) { bm.Pause() - // This data should now be moved to the coldstore. + // Messages should be moved to the coldstore + // State should be gced // Access it without hotview to keep it there while checking that it still exists // Only state compute uses hot view so garbager Exists backed by ChainReadObj is all good - assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore") - bm.Restart() - waitForPrune(ctx, t, 1, full) - assert.False(g.t, g.Exists(ctx, garbage), "Garbage should be removed from cold store through auto prune but it's still there") + assert.False(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore") + assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore") } func waitForCompaction(ctx context.Context, t *testing.T, cIdx int64, n *kit.TestFullNode) { @@ -304,7 +331,7 @@ func NewGarbager(ctx context.Context, t *testing.T, n *kit.TestFullNode) *Garbag latest: 0, maddr4Data: address.Undef, } - g.createMiner(ctx) + g.createMiner4Data(ctx) g.newPeerID(ctx) return g } @@ -320,6 +347,12 @@ func (g *Garbager) Drop(ctx context.Context) (cid.Cid, abi.ChainEpoch) { return c, g.newPeerID(ctx) } +// message returns the cid referencing a message and the chain epoch it went on chain +func (g *Garbager) Message(ctx context.Context) (cid.Cid, abi.ChainEpoch) { + mw := g.createMiner(ctx) + return mw.Message, mw.Height +} + // exists checks whether the cid is reachable through the node func (g *Garbager) Exists(ctx context.Context, c cid.Cid) bool { // check chain get / blockstore get @@ -374,8 +407,15 @@ func (g *Garbager) mInfoCid(ctx context.Context) cid.Cid { return mSt.Info } -func (g *Garbager) createMiner(ctx context.Context) { +func (g *Garbager) createMiner4Data(ctx context.Context) { require.True(g.t, g.maddr4Data == address.Undef, "garbager miner actor already created") + mw := g.createMiner(ctx) + var retval power6.CreateMinerReturn + require.NoError(g.t, retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return))) + g.maddr4Data = retval.IDAddress +} + +func (g *Garbager) createMiner(ctx context.Context) *lapi.MsgLookup { owner, err := g.node.WalletDefaultAddress(ctx) require.NoError(g.t, err) worker := owner @@ -401,8 +441,5 @@ func (g *Garbager) createMiner(ctx context.Context) { mw, err := g.node.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, lapi.LookbackNoLimit, true) require.NoError(g.t, err) require.True(g.t, mw.Receipt.ExitCode == 0, "garbager's internal create miner message failed") - - var retval power6.CreateMinerReturn - require.NoError(g.t, retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return))) - g.maddr4Data = retval.IDAddress + return mw } diff --git a/node/builder_chain.go b/node/builder_chain.go index e24b2609734..7a96e163c20 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -181,7 +181,7 @@ func ConfigFullNode(c interface{}) Option { Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore), If(cfg.Chainstore.EnableSplitstore, - If(cfg.Chainstore.Splitstore.ColdStoreType == "universal", + If(cfg.Chainstore.Splitstore.ColdStoreType == "universal" || cfg.Chainstore.Splitstore.ColdStoreType == "messages", Override(new(dtypes.ColdBlockstore), From(new(dtypes.UniversalBlockstore)))), If(cfg.Chainstore.Splitstore.ColdStoreType == "discard", Override(new(dtypes.ColdBlockstore), modules.DiscardColdBlockstore)), diff --git a/node/config/def.go b/node/config/def.go index 0566c7d9958..a6e6fc66aa3 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -91,12 +91,11 @@ func DefaultFullNode() *FullNode { Chainstore: Chainstore{ EnableSplitstore: false, Splitstore: Splitstore{ - ColdStoreType: "universal", + ColdStoreType: "messages", HotStoreType: "badger", MarkSetType: "badger", - HotStoreFullGCFrequency: 20, - ColdStoreFullGCFrequency: 7, + HotStoreFullGCFrequency: 20, }, }, } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 80538c70e40..902cb1a058e 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -1116,7 +1116,7 @@ submitting proofs to the chain individually`, Type: "string", Comment: `ColdStoreType specifies the type of the coldstore. -It can be "universal" (default) or "discard" for discarding cold blocks.`, +It can be "messages" (default) to store only messages, "universal" to store all chain state or "discard" for discarding cold blocks.`, }, { Name: "HotStoreType", @@ -1147,30 +1147,6 @@ the compaction boundary; default is 0.`, A value of 0 disables, while a value 1 will do full GC in every compaction. Default is 20 (about once a week).`, }, - { - Name: "EnableColdStoreAutoPrune", - Type: "bool", - - Comment: `EnableColdStoreAutoPrune turns on compaction of the cold store i.e. pruning -where hotstore compaction occurs every finality epochs pruning happens every 3 finalities -Default is false`, - }, - { - Name: "ColdStoreFullGCFrequency", - Type: "uint64", - - Comment: `ColdStoreFullGCFrequency specifies how often to performa a full (moving) GC on the coldstore. -Only applies if auto prune is enabled. A value of 0 disables while a value of 1 will do -full GC in every prune. -Default is 7 (about once every a week)`, - }, - { - Name: "ColdStoreRetention", - Type: "int64", - - Comment: `ColdStoreRetention specifies the retention policy for data reachable from the chain, in -finalities beyond the compaction boundary, default is 0, -1 retains everything`, - }, }, "StorageMiner": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index df5f018e3c7..dbfa2e43272 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -555,7 +555,7 @@ type Chainstore struct { type Splitstore struct { // ColdStoreType specifies the type of the coldstore. - // It can be "universal" (default) or "discard" for discarding cold blocks. + // It can be "messages" (default) to store only messages, "universal" to store all chain state or "discard" for discarding cold blocks. ColdStoreType string // HotStoreType specifies the type of the hotstore. // Only currently supported value is "badger". @@ -571,21 +571,6 @@ type Splitstore struct { // A value of 0 disables, while a value 1 will do full GC in every compaction. // Default is 20 (about once a week). HotStoreFullGCFrequency uint64 - - // EnableColdStoreAutoPrune turns on compaction of the cold store i.e. pruning - // where hotstore compaction occurs every finality epochs pruning happens every 3 finalities - // Default is false - EnableColdStoreAutoPrune bool - - // ColdStoreFullGCFrequency specifies how often to performa a full (moving) GC on the coldstore. - // Only applies if auto prune is enabled. A value of 0 disables while a value of 1 will do - // full GC in every prune. - // Default is 7 (about once every a week) - ColdStoreFullGCFrequency uint64 - - // ColdStoreRetention specifies the retention policy for data reachable from the chain, in - // finalities beyond the compaction boundary, default is 0, -1 retains everything - ColdStoreRetention int64 } // // Full Node diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 21ce4187562..90b7b6183b9 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -84,11 +84,9 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked cfg := &splitstore.Config{ MarkSetType: cfg.Splitstore.MarkSetType, DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", + UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal", HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention, HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency, - EnableColdStoreAutoPrune: cfg.Splitstore.EnableColdStoreAutoPrune, - ColdStoreFullGCFrequency: cfg.Splitstore.ColdStoreFullGCFrequency, - ColdStoreRetention: cfg.Splitstore.ColdStoreRetention, } ss, err := splitstore.Open(path, ds, hot, cold, cfg) if err != nil {