Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:splitstore:single compaction that can handle prune aka two marksets one compaction #9571

Merged
merged 9 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ type Config struct {
// and directly purges cold blocks.
DiscardColdBlocks bool

// UniversalColdBlocks indicates whether all blocks being garbage collected and purged
// from the hotstore should be written to the cold store
UniversalColdBlocks bool

// HotstoreMessageRetention indicates the hotstore retention policy for messages.
// It has the following semantics:
// - a value of 0 will only retain messages within the compaction boundary (4 finalities)
Expand All @@ -111,21 +115,6 @@ type Config struct {
// A positive value is the number of compactions before a full GC is performed;
// a value of 1 will perform full GC in every compaction.
HotStoreFullGCFrequency uint64

// EnableColdStoreAutoPrune turns on compaction of the cold store i.e. pruning
// where hotstore compaction occurs every finality epochs pruning happens every 3 finalities
// Default is false
EnableColdStoreAutoPrune bool

// ColdStoreFullGCFrequency specifies how often to performa a full (moving) GC on the coldstore.
// Only applies if auto prune is enabled. A value of 0 disables while a value of 1 will do
// full GC in every prune.
// Default is 7 (about once every a week)
ColdStoreFullGCFrequency uint64

// ColdStoreRetention specifies the retention policy for data reachable from the chain, in
// finalities beyond the compaction boundary, default is 0, -1 retains everything
ColdStoreRetention int64
}

// ChainAccessor allows the Splitstore to access the chain. It will most likely
Expand Down
2 changes: 1 addition & 1 deletion blockstore/splitstore/splitstore_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
}

return nil
})
}, func(cid.Cid) error { return nil })

if err != nil {
err = xerrors.Errorf("error walking chain: %w", err)
Expand Down
166 changes: 104 additions & 62 deletions blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/filecoin-project/go-state-types/abi"

bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics"
Expand Down Expand Up @@ -134,39 +133,6 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
log.Infow("compaction done", "took", time.Since(start))
}()
// only prune if auto prune is enabled and after at least one compaction
} else if s.cfg.EnableColdStoreAutoPrune && epoch-s.pruneEpoch > PruneThreshold && s.compactionIndex > 0 {
s.beginTxnProtect()
s.compactType = cold
go func() {
defer atomic.StoreInt32(&s.compacting, 0)
defer s.endTxnProtect()

log.Info("pruning splitstore")
start := time.Now()

var retainP func(int64) bool
switch {
case s.cfg.ColdStoreRetention > int64(0):
retainP = func(depth int64) bool {
return depth <= int64(CompactionBoundary)+s.cfg.ColdStoreRetention*int64(build.Finality)
}
case s.cfg.ColdStoreRetention < 0:
retainP = func(_ int64) bool { return true }
default:
retainP = func(depth int64) bool {
return depth <= int64(CompactionBoundary)
}
}
movingGC := s.cfg.ColdStoreFullGCFrequency > 0 && s.pruneIndex%int64(s.cfg.ColdStoreFullGCFrequency) == 0
var gcOpts []bstore.BlockstoreGCOption
if movingGC {
gcOpts = append(gcOpts, bstore.WithFullGC(true))
}
doGC := func() error { return s.gcBlockstore(s.cold, gcOpts) }

s.prune(curTs, retainP, doGC)
log.Infow("prune done", "took", time.Since(start))
}()
} else {
// no compaction necessary
atomic.StoreInt32(&s.compacting, 0)
Expand Down Expand Up @@ -562,6 +528,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
defer markSet.Close() //nolint:errcheck
defer s.debug.Flush()

coldSet, err := s.markSetEnv.New("cold", s.markSetSize)
if err != nil {
return xerrors.Errorf("error creating cold mark set: %w", err)
}
defer coldSet.Close() //nolint:errcheck

if err := s.checkClosing(); err != nil {
return err
}
Expand All @@ -580,24 +552,52 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
startMark := time.Now()

count := new(int64)
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{},
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}

visit, err := markSet.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
}
coldCount := new(int64)
fCold := func(c cid.Cid) error {
// Writes to cold set optimized away in universal and discard mode
//
// Nothing gets written to cold store in discard mode so no cold objects to write
// Everything not marked hot gets written to cold store in universal mode so no need to track cold objects separately
if s.cfg.DiscardColdBlocks || s.cfg.UniversalColdBlocks {
return nil
}

if !visit {
return errStopWalk
}
if isUnitaryObject(c) {
return errStopWalk
}

atomic.AddInt64(count, 1)
return nil
})
visit, err := coldSet.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
}

if !visit {
return errStopWalk
}

atomic.AddInt64(coldCount, 1)
return nil
}
fHot := func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}

visit, err := markSet.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
}

if !visit {
return errStopWalk
}

atomic.AddInt64(count, 1)
return nil
}

err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, fHot, fCold)

if err != nil {
return xerrors.Errorf("error marking: %w", err)
Expand Down Expand Up @@ -631,8 +631,14 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
defer coldw.Close() //nolint:errcheck

purgew, err := NewColdSetWriter(s.discardSetPath())
if err != nil {
return xerrors.Errorf("error creating deadset: %w", err)
}
defer purgew.Close() //nolint:errcheck

// some stats for logging
var hotCnt, coldCnt int
var hotCnt, coldCnt, purgeCnt int
err = s.hot.ForEachKey(func(c cid.Cid) error {
// was it marked?
mark, err := markSet.Has(c)
Expand All @@ -645,9 +651,27 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return nil
}

// it's cold, mark it as candidate for move
// it needs to be removed from hot store, mark it as candidate for purge
if err := purgew.Write(c); err != nil {
return xerrors.Errorf("error writing cid to purge set: %w", err)
}
purgeCnt++

coldMark, err := coldSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking cold mark set for %s: %w", c, err)
}

// Discard mode: coldMark == false, s.cfg.UniversalColdBlocks == false, always return here, no writes to cold store
// Universal mode: coldMark == false, s.cfg.UniversalColdBlocks == true, never stop here, all writes to cold store
// Otherwise: s.cfg.UniversalColdBlocks == false, if !coldMark stop here and don't write to cold store, if coldMark continue and write to cold store
if !coldMark && !s.cfg.UniversalColdBlocks { // universal mode means mark everything as cold
ZenGround0 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// it's cold, mark as candidate for move
if err := coldw.Write(c); err != nil {
return xerrors.Errorf("error writing cid to coldstore: %w", err)
return xerrors.Errorf("error writing cid to cold set")
}
coldCnt++

Expand All @@ -656,7 +680,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
if err != nil {
return xerrors.Errorf("error collecting cold objects: %w", err)
}

if err := purgew.Close(); err != nil {
return xerrors.Errorf("erroring closing purgeset: %w", err)
}
if err := coldw.Close(); err != nil {
return xerrors.Errorf("error closing coldset: %w", err)
}
Expand Down Expand Up @@ -705,6 +731,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
}

purger, err := NewColdSetReader(s.discardSetPath())
if err != nil {
return xerrors.Errorf("error opening coldset: %w", err)
}
defer purger.Close() //nolint:errcheck

// 4. Purge cold objects with checkpointing for recovery.
// This is the critical section of compaction, whereby any cold object not in the markSet is
// considered already deleted.
Expand Down Expand Up @@ -736,7 +768,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// 5. purge cold objects from the hotstore, taking protected references into account
log.Info("purging cold objects from the hotstore")
startPurge := time.Now()
err = s.purge(coldr, checkpoint, markSet)
err = s.purge(purger, checkpoint, markSet)
if err != nil {
return xerrors.Errorf("error purging cold objects: %w", err)
}
Expand Down Expand Up @@ -864,7 +896,7 @@ func (s *SplitStore) endCriticalSection() {
}

func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
visitor ObjectVisitor, f func(cid.Cid) error) error {
visitor ObjectVisitor, fHot, fCold func(cid.Cid) error) error {
var walked ObjectVisitor
var mx sync.Mutex
// we copy the tipset first into a new slice, which allows us to reuse it in every epoch.
Expand All @@ -886,7 +918,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp

atomic.AddInt64(walkCnt, 1)

if err := f(c); err != nil {
if err := fHot(c); err != nil {
return err
}

Expand All @@ -904,27 +936,37 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
if inclMsgs < inclState {
// we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
if err := s.walkObjectIncomplete(hdr.Messages, visitor, f, stopWalk); err != nil {
if err := s.walkObjectIncomplete(hdr.Messages, visitor, fHot, stopWalk); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}

if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, f, stopWalk); err != nil {
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fHot, stopWalk); err != nil {
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
}
} else {
if err := s.walkObject(hdr.Messages, visitor, f); err != nil {
if err := s.walkObject(hdr.Messages, visitor, fHot); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}

if err := s.walkObject(hdr.ParentMessageReceipts, visitor, f); err != nil {
if err := s.walkObject(hdr.ParentMessageReceipts, visitor, fHot); err != nil {
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
}
}
}

// messages and receipts outside of inclMsgs are included in the cold store
if hdr.Height < inclMsgs && hdr.Height > 0 {
if err := s.walkObjectIncomplete(hdr.Messages, visitor, fCold, stopWalk); err != nil {
ZenGround0 marked this conversation as resolved.
Show resolved Hide resolved
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fCold, stopWalk); err != nil {
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
}
}

// state is only retained if within the inclState boundary, with the exception of genesis
if hdr.Height >= inclState || hdr.Height == 0 {
if err := s.walkObject(hdr.ParentStateRoot, visitor, f); err != nil {
if err := s.walkObject(hdr.ParentStateRoot, visitor, fHot); err != nil {
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
}
atomic.AddInt64(scanCnt, 1)
Expand Down Expand Up @@ -1296,7 +1338,7 @@ func (s *SplitStore) coldSetPath() string {
return filepath.Join(s.path, "coldset")
}

func (s *SplitStore) deadSetPath() string {
func (s *SplitStore) discardSetPath() string {
return filepath.Join(s.path, "deadset")
Comment on lines -1299 to 1342
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just sanity checking - we're not un-renaming the function because deadSet is now not quite what it used to mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it now has a use in compaction not just prune and this is better suited to that domain.

}

Expand Down
14 changes: 7 additions & 7 deletions blockstore/splitstore/splitstore_prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
log.Info("collecting dead objects")
startCollect := time.Now()

deadw, err := NewColdSetWriter(s.deadSetPath())
deadw, err := NewColdSetWriter(s.discardSetPath())
if err != nil {
return xerrors.Errorf("error creating coldset: %w", err)
}
Expand Down Expand Up @@ -267,7 +267,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
return err
}

deadr, err := NewColdSetReader(s.deadSetPath())
deadr, err := NewColdSetReader(s.discardSetPath())
if err != nil {
return xerrors.Errorf("error opening deadset: %w", err)
}
Expand Down Expand Up @@ -311,10 +311,10 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
log.Warnf("error removing checkpoint: %s", err)
}
if err := deadr.Close(); err != nil {
log.Warnf("error closing deadset: %s", err)
log.Warnf("error closing discard set: %s", err)
}
if err := os.Remove(s.deadSetPath()); err != nil {
log.Warnf("error removing deadset: %s", err)
if err := os.Remove(s.discardSetPath()); err != nil {
log.Warnf("error removing discard set: %s", err)
}

// we are done; do some housekeeping
Expand Down Expand Up @@ -344,7 +344,7 @@ func (s *SplitStore) completePrune() error {
}
defer checkpoint.Close() //nolint:errcheck

deadr, err := NewColdSetReader(s.deadSetPath())
deadr, err := NewColdSetReader(s.discardSetPath())
if err != nil {
return xerrors.Errorf("error opening deadset: %w", err)
}
Expand Down Expand Up @@ -378,7 +378,7 @@ func (s *SplitStore) completePrune() error {
if err := deadr.Close(); err != nil {
log.Warnf("error closing deadset: %s", err)
}
if err := os.Remove(s.deadSetPath()); err != nil {
if err := os.Remove(s.discardSetPath()); err != nil {
log.Warnf("error removing deadset: %s", err)
}

Expand Down
Loading