Skip to content

Commit

Permalink
export range and small refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
zengroundumbass committed May 11, 2022
1 parent 33a05d3 commit acaecc5
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 9 deletions.
2 changes: 2 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ type FullNode interface {
// If oldmsgskip is set, messages from before the requested roots are also not included.
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read

ChainExportRange(ctx context.Context, oldmsgskip bool, from, to types.TipSetKey) (<-chan []byte, error) //perm:read

// ChainCheckBlockstore performs an (asynchronous) health check on the chain/state blockstore
// if supported by the underlying implementation.
ChainCheckBlockstore(context.Context) error //perm:admin
Expand Down
Binary file not shown.
Binary file added api/v0api/gomock_reflect_3555711957/output
Binary file not shown.
22 changes: 18 additions & 4 deletions chain/store/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ func (cs *ChainStore) UnionStore() bstore.Blockstore {
}

func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
pleaseVisit := func(b types.BlockHeader) bool {
return b.Height > ts.Height()-inclRecentRoots
}
return cs.exportIf(ctx, ts, pleaseVisit, skipOldMsgs, w)
}

func (cs *ChainStore) ExportRange(ctx context.Context, from, to *types.TipSet, skipOldMsgs bool, w io.Writer) error {
pleaseVisit := func(b types.BlockHeader) bool {
return b.Height >= from.Height() && b.Height < to.Height()
}
return cs.exportIf(ctx, to, pleaseVisit, skipOldMsgs, w)
}

func (cs *ChainStore) exportIf(ctx context.Context, ts *types.TipSet, visit func(b types.BlockHeader) bool, skipOldMsgs bool, w io.Writer) error {
h := &car.CarHeader{
Roots: ts.Cids(),
Version: 1,
Expand All @@ -33,7 +47,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
}

unionBs := cs.UnionStore()
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error {
return cs.WalkSnapshot(ctx, ts, visit, skipOldMsgs, true, func(c cid.Cid) error {
blk, err := unionBs.Get(ctx, c)
if err != nil {
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
Expand Down Expand Up @@ -66,7 +80,7 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
return root, nil
}

func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error {
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, visit func(b types.BlockHeader) bool, skipOldMsgs bool, skipMsgReceipts bool, cb func(cid.Cid) error) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}
Expand Down Expand Up @@ -104,7 +118,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe
}

var cids []cid.Cid
if !skipOldMsgs || b.Height > ts.Height()-inclRecentRoots {
if !skipOldMsgs || visit(b) {
if walked.Visit(b.Messages) {
mcids, err := recurseLinks(ctx, cs.chainBlockstore, walked, b.Messages, []cid.Cid{b.Messages})
if err != nil {
Expand All @@ -125,7 +139,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe

out := cids

if b.Height == 0 || b.Height > ts.Height()-inclRecentRoots {
if b.Height == 0 || visit(b) {
if walked.Visit(b.ParentStateRoot) {
cids, err := recurseLinks(ctx, cs.stateBlockstore, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot})
if err != nil {
Expand Down
32 changes: 27 additions & 5 deletions node/impl/full/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,17 +584,39 @@ func (m *ChainModule) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.M
return cm.VMMessage(), nil
}

func (a *ChainAPI) ChainExportRange(ctx context.Context, skipoldmsgs bool, from, to types.TipSetKey) (<-chan []byte, error) {
export := func(w io.Writer) error {
from, err := a.Chain.GetTipSetFromKey(ctx, from)
if err != nil {
return xerrors.Errorf("loading from tipset %s: %w", from, err)
}
to, err := a.Chain.GetTipSetFromKey(ctx, to)
if err != nil {
return xerrors.Errorf("loading tipset %s: %w", to, err)
}
return a.Chain.ExportRange(ctx, from, to, skipoldmsgs, w)
}
return a.chainExport(ctx, export)
}

func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, skipoldmsgs bool, tsk types.TipSetKey) (<-chan []byte, error) {
ts, err := a.Chain.GetTipSetFromKey(ctx, tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
export := func(w io.Writer) error {
ts, err := a.Chain.GetTipSetFromKey(ctx, tsk)
if err != nil {
return xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
return a.Chain.Export(ctx, ts, nroots, skipoldmsgs, w)
}
return a.chainExport(ctx, export)
}

func (a *ChainAPI) chainExport(ctx context.Context, export func(w io.Writer) error) (<-chan []byte, error) {

r, w := io.Pipe()
out := make(chan []byte)
go func() {
bw := bufio.NewWriterSize(w, 1<<20)

err := a.Chain.Export(ctx, ts, nroots, skipoldmsgs, bw)
err := export(bw)
bw.Flush() //nolint:errcheck // it is a write to a pipe
w.CloseWithError(err) //nolint:errcheck // it is a pipe
}()
Expand Down

0 comments on commit acaecc5

Please sign in to comment.