diff --git a/cmd/downloader/root.go b/cmd/downloader/root.go index bc20e1b5088..eef55031600 100644 --- a/cmd/downloader/root.go +++ b/cmd/downloader/root.go @@ -18,48 +18,35 @@ import ( "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/log/v3" "github.com/spf13/cobra" - "github.com/urfave/cli" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/reflection" ) +var ( + datadir string + seeding bool + downloaderApiAddr string +) + func init() { flags := append(debug.Flags, utils.MetricFlags...) - flags = append(flags, Addr, DataDir) utils.CobraFlags(rootCmd, flags) - rootCmd.PersistentFlags().Bool("seeding", true, "Seed snapshots") -} - -var ( - Addr = cli.StringFlag{ - Name: "downloader.api.addr", - Usage: "external downloader api network address, for example: 127.0.0.1:9093 serves remote downloader interface", - Value: "127.0.0.1:9093", - } - DataDir = cli.StringFlag{ - Name: utils.DataDirFlag.Name, - Usage: utils.DataDirFlag.Usage, - Value: paths.DefaultDataDir(), + rootCmd.Flags().StringVar(&datadir, utils.DataDirFlag.Name, paths.DefaultDataDir(), utils.DataDirFlag.Usage) + if err := rootCmd.MarkFlagDirname(utils.DataDirFlag.Name); err != nil { + panic(err) } -) -type Config struct { - Addr string - Dir string - Seeding bool + rootCmd.PersistentFlags().BoolVar(&seeding, "seeding", true, "Seed snapshots") + rootCmd.Flags().StringVar(&downloaderApiAddr, "downloader.api.addr", "127.0.0.1:9093", "external downloader api network address, for example: 127.0.0.1:9093 serves remote downloader interface") } func main() { ctx, cancel := utils.RootContext() defer cancel() - if err := rootCmd.MarkFlagDirname(utils.DataDirFlag.Name); err != nil { - panic(err) - } - if err := rootCmd.ExecuteContext(ctx); err != nil { fmt.Println(err) os.Exit(1) @@ -78,79 +65,62 @@ var rootCmd = &cobra.Command{ PersistentPostRun: func(cmd *cobra.Command, args []string) { debug.Exit() }, - RunE: runDownloader, -} + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + snapshotsDir := path.Join(datadir, "snapshots") + log.Info("Run snapshot downloader", "addr", downloaderApiAddr, "datadir", datadir, "seeding", seeding) -func runDownloader(cmd *cobra.Command, args []string) error { - ctx := cmd.Context() - cfg := &Config{} - var err error - cfg.Addr, err = cmd.Flags().GetString(Addr.Name) - if err != nil { - return err - } - dir, err := cmd.Flags().GetString(DataDir.Name) - if err != nil { - return err - } - cfg.Dir = path.Join(dir, "snapshots") - cfg.Seeding, err = cmd.Flags().GetBool("seeding") - if err != nil { - return err - } - log.Info("Run snapshot downloader", "addr", cfg.Addr, "dir", cfg.Dir, "seeding", cfg.Seeding) - - bittorrentServer, err := snapshotsync.NewServer(cfg.Dir, cfg.Seeding) - if err != nil { - return fmt.Errorf("new server: %w", err) - } - log.Info("Load") - err = bittorrentServer.Load() - if err != nil { - return fmt.Errorf("load: %w", err) - } - - go func() { - _, err := bittorrentServer.Download(ctx, &proto_snap.DownloadSnapshotRequest{ - NetworkId: params.MainnetChainConfig.ChainID.Uint64(), - Type: snapshotsync.GetAvailableSnapshotTypes(params.MainnetChainConfig.ChainID.Uint64()), - }) + bittorrentServer, err := snapshotsync.NewServer(snapshotsDir, seeding) if err != nil { - log.Error("Download failed", "err", err, "networkID", params.MainnetChainConfig.ChainID.Uint64()) + return fmt.Errorf("new server: %w", err) + } + log.Info("Load") + err = bittorrentServer.Load() + if err != nil { + return fmt.Errorf("load: %w", err) } - }() - go func() { - for { - select { - case <-cmd.Context().Done(): - return - default: - } - snapshots, err := bittorrentServer.Snapshots(ctx, &proto_snap.SnapshotsRequest{ + go func() { + _, err := bittorrentServer.Download(ctx, &proto_snap.DownloadSnapshotRequest{ NetworkId: params.MainnetChainConfig.ChainID.Uint64(), + Type: snapshotsync.GetAvailableSnapshotTypes(params.MainnetChainConfig.ChainID.Uint64()), }) if err != nil { - log.Error("get snapshots", "err", err) - time.Sleep(time.Minute) - continue + log.Error("Download failed", "err", err, "networkID", params.MainnetChainConfig.ChainID.Uint64()) } - stats := bittorrentServer.Stats(context.Background()) - for _, v := range snapshots.Info { - log.Info("Snapshot "+v.Type.String(), "%", v.Readiness, "peers", stats[v.Type.String()].ConnectedSeeders) + }() + go func() { + for { + select { + case <-cmd.Context().Done(): + return + default: + } + + snapshots, err := bittorrentServer.Snapshots(ctx, &proto_snap.SnapshotsRequest{ + NetworkId: params.MainnetChainConfig.ChainID.Uint64(), + }) + if err != nil { + log.Error("get snapshots", "err", err) + time.Sleep(time.Minute) + continue + } + stats := bittorrentServer.Stats(context.Background()) + for _, v := range snapshots.Info { + log.Info("Snapshot "+v.Type.String(), "%", v.Readiness, "peers", stats[v.Type.String()].ConnectedSeeders) + } + time.Sleep(time.Minute) } - time.Sleep(time.Minute) + }() + grpcServer, err := StartGrpc(bittorrentServer, downloaderApiAddr, nil) + if err != nil { + return err } - }() - grpcServer, err := StartGrpc(bittorrentServer, cfg.Addr, nil) - if err != nil { - return err - } - <-cmd.Context().Done() - grpcServer.GracefulStop() - - return nil + <-cmd.Context().Done() + grpcServer.GracefulStop() + return nil + }, } func StartGrpc(snServer *snapshotsync.SNDownloaderServer, addr string, creds *credentials.TransportCredentials) (*grpc.Server, error) { diff --git a/cmd/downloader/utils/utils.go b/cmd/downloader/utils/utils.go deleted file mode 100644 index ed7930f9d67..00000000000 --- a/cmd/downloader/utils/utils.go +++ /dev/null @@ -1,20 +0,0 @@ -package utils - -import ( - "errors" - "os" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/log/v3" -) - -var ErrUnsupported error = errors.New("unsupported KV type") - -func RmTmpFiles(snapshotPath string) error { - return os.Remove(snapshotPath + "/mdbx.lck") -} - -func OpenSnapshotKV(configsFunc mdbx.TableCfgFunc, path string) kv.RwDB { - return mdbx.NewMDBX(log.New()).WithTablessCfg(configsFunc).Path(path).MustOpen() -} diff --git a/turbo/snapshotsync/downloader.go b/turbo/snapshotsync/downloader.go index 0d42d0688d6..fecfe84c631 100644 --- a/turbo/snapshotsync/downloader.go +++ b/turbo/snapshotsync/downloader.go @@ -9,18 +9,17 @@ import ( "path/filepath" "time" + lg "github.com/anacrolix/log" "github.com/anacrolix/torrent/bencode" "github.com/ledgerwatch/erigon-lib/gointerfaces/snapshotsync" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/log/v3" - lg "github.com/anacrolix/log" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/metainfo" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/log/v3" - "golang.org/x/sync/errgroup" ) type Client struct { @@ -55,8 +54,8 @@ func DefaultTorrentConfig() *torrent.ClientConfig { torrentConfig.NoDHT = true torrentConfig.DisableTrackers = false torrentConfig.Debug = false - torrentConfig.Logger = torrentConfig.Logger.FilterLevel(lg.Debug) torrentConfig.Logger = NewAdapterLogger() + torrentConfig.Logger = torrentConfig.Logger.FilterLevel(lg.Info) return torrentConfig } @@ -118,47 +117,44 @@ func (cli *Client) AddTorrentSpec(snapshotName string, snapshotHash metainfo.Has return t, err } -func (cli *Client) AddTorrent(ctx context.Context, db kv.RwTx, snapshotType snapshotsync.SnapshotType, networkID uint64) error { //nolint: interfacer - infoHashBytes, infoBytes, err := getTorrentSpec(db, snapshotType.String(), networkID) - if err != nil { - return err - } - var infoHash metainfo.Hash +type torrentSpecFromDb struct { + exists bool + snapshotType snapshotsync.SnapshotType + networkID uint64 + infoHash torrent.InfoHash + infoBytes []byte +} + +func (cli *Client) AddTorrent(ctx context.Context, spec *torrentSpecFromDb) (*torrentSpecFromDb, error) { //nolint: interfacer newTorrent := false - if infoHashBytes != nil { - copy(infoHash[:], infoHashBytes[:metainfo.HashSize]) - } else { - log.Info("Init new torrent", "snapshot", snapshotType.String()) + if !spec.exists { + log.Info("Init new torrent", "snapshot", spec.snapshotType.String()) newTorrent = true var ok bool - infoHash, ok = TorrentHashes[networkID][snapshotType] + spec.infoHash, ok = TorrentHashes[spec.networkID][spec.snapshotType] if !ok { - return fmt.Errorf("%w type %v, networkID %v", ErrInvalidSnapshot, snapshotType, networkID) + return nil, fmt.Errorf("%w type %v, networkID %v", ErrInvalidSnapshot, spec.snapshotType, spec.networkID) } } - log.Info("Added torrent spec", "snapshot", snapshotType.String(), "hash", infoHash.String()) - t, err := cli.AddTorrentSpec(snapshotType.String(), infoHash, infoBytes) + log.Info("Added torrent spec", "snapshot", spec.snapshotType.String(), "hash", spec.infoHash.String()) + t, err := cli.AddTorrentSpec(spec.snapshotType.String(), spec.infoHash, spec.infoBytes) if err != nil { - return fmt.Errorf("error on add snapshot: %w", err) + return nil, fmt.Errorf("error on add snapshot: %w", err) } - log.Info("Getting infobytes", "snapshot", snapshotType.String()) - infoBytes, err = cli.GetInfoBytes(context.Background(), infoHash) + log.Info("Getting infobytes", "snapshot", spec.snapshotType.String()) + spec.infoBytes, err = cli.GetInfoBytes(context.Background(), spec.infoHash) if err != nil { - log.Warn("Init failure", "snapshot", snapshotType.String(), "err", ctx.Err()) - return fmt.Errorf("error on get info bytes: %w", err) + log.Warn("Init failure", "snapshot", spec.snapshotType.String(), "err", ctx.Err()) + return nil, fmt.Errorf("error on get info bytes: %w", err) } t.AllowDataDownload() t.DownloadAll() - log.Info("Got infobytes", "snapshot", snapshotType.String(), "file", t.Files()[0].Path()) + log.Info("Got infobytes", "snapshot", spec.snapshotType.String(), "file", t.Files()[0].Path()) if newTorrent { - log.Info("Save spec", "snapshot", snapshotType.String()) - err := saveTorrentSpec(db, snapshotType.String(), networkID, infoHash, infoBytes) - if err != nil { - return err - } + return spec, nil } - return nil + return nil, nil } func (cli *Client) GetInfoBytes(ctx context.Context, snapshotHash metainfo.Hash) ([]byte, error) { @@ -179,42 +175,6 @@ func (cli *Client) GetInfoBytes(ctx context.Context, snapshotHash metainfo.Hash) } } -func (cli *Client) AddSnapshotsTorrents(ctx context.Context, db kv.RwTx, networkId uint64, mode SnapshotMode) error { - ctx, cancel := context.WithTimeout(ctx, time.Minute*10) - defer cancel() - eg := errgroup.Group{} - - if mode.Headers { - eg.Go(func() error { - return cli.AddTorrent(ctx, db, snapshotsync.SnapshotType_headers, networkId) - }) - } - - if mode.Bodies { - eg.Go(func() error { - return cli.AddTorrent(ctx, db, snapshotsync.SnapshotType_bodies, networkId) - }) - } - - if mode.State { - eg.Go(func() error { - return cli.AddTorrent(ctx, db, snapshotsync.SnapshotType_state, networkId) - }) - } - - if mode.Receipts { - eg.Go(func() error { - return cli.AddTorrent(ctx, db, snapshotsync.SnapshotType_receipts, networkId) - }) - } - err := eg.Wait() - if err != nil { - return err - } - - return nil -} - func (cli *Client) Download() { log.Info("Start snapshot downloading") torrents := cli.Cli.Torrents() @@ -332,30 +292,43 @@ func (cli *Client) StopSeeding(hash metainfo.Hash) error { return nil } -func getTorrentSpec(db kv.Tx, snapshotName string, networkID uint64) ([]byte, []byte, error) { - var infohash, infobytes []byte +func getTorrentSpec(db kv.Tx, snapshotName snapshotsync.SnapshotType, networkID uint64) (*torrentSpecFromDb, error) { + snapshotNameS := snapshotName.String() + var infoHashBytes, infobytes []byte var err error b := make([]byte, 8) binary.BigEndian.PutUint64(b, networkID) - infohash, err = db.GetOne(kv.SnapshotInfo, MakeInfoHashKey(snapshotName, networkID)) + infoHashBytes, err = db.GetOne(kv.SnapshotInfo, MakeInfoHashKey(snapshotNameS, networkID)) if err != nil { - return nil, nil, err + return nil, err + } + var infoHash metainfo.Hash + if infoHashBytes != nil { + copy(infoHash[:], infoHashBytes[:metainfo.HashSize]) } - infobytes, err = db.GetOne(kv.SnapshotInfo, MakeInfoBytesKey(snapshotName, networkID)) + + infobytes, err = db.GetOne(kv.SnapshotInfo, MakeInfoBytesKey(snapshotNameS, networkID)) if err != nil { - return nil, nil, err + return nil, err } - return infohash, infobytes, nil + return &torrentSpecFromDb{ + exists: infoHashBytes != nil, + snapshotType: snapshotName, + networkID: networkID, + infoHash: infoHash, + infoBytes: infobytes, + }, nil } -func saveTorrentSpec(db kv.Putter, snapshotName string, networkID uint64, hash torrent.InfoHash, infobytes []byte) error { +func saveTorrentSpec(db kv.Putter, spec *torrentSpecFromDb) error { + snapshotNameS := spec.snapshotType.String() b := make([]byte, 8) - binary.BigEndian.PutUint64(b, networkID) - err := db.Put(kv.SnapshotInfo, MakeInfoHashKey(snapshotName, networkID), hash.Bytes()) + binary.BigEndian.PutUint64(b, spec.networkID) + err := db.Put(kv.SnapshotInfo, MakeInfoHashKey(snapshotNameS, spec.networkID), spec.infoHash.Bytes()) if err != nil { return err } - return db.Put(kv.SnapshotInfo, MakeInfoBytesKey(snapshotName, networkID), infobytes) + return db.Put(kv.SnapshotInfo, MakeInfoBytesKey(snapshotNameS, spec.networkID), spec.infoBytes) } func MakeInfoHashKey(snapshotName string, networkID uint64) []byte { diff --git a/turbo/snapshotsync/server.go b/turbo/snapshotsync/server.go index 5be5a839018..857249ed7bc 100644 --- a/turbo/snapshotsync/server.go +++ b/turbo/snapshotsync/server.go @@ -4,11 +4,14 @@ import ( "context" "errors" "fmt" + "time" "github.com/anacrolix/torrent" "github.com/ledgerwatch/erigon-lib/gointerfaces/snapshotsync" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" + "github.com/ledgerwatch/log/v3" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/emptypb" ) @@ -55,11 +58,133 @@ type SNDownloaderServer struct { } func (s *SNDownloaderServer) Download(ctx context.Context, request *snapshotsync.DownloadSnapshotRequest) (*emptypb.Empty, error) { - if err := s.db.Update(ctx, func(tx kv.RwTx) error { - return s.t.AddSnapshotsTorrents(ctx, tx, request.NetworkId, FromSnapshotTypes(request.Type)) + ctx, cancel := context.WithTimeout(ctx, time.Minute*10) + defer cancel() + eg := errgroup.Group{} + + networkId := request.NetworkId + mode := FromSnapshotTypes(request.Type) + + var headerSpec, bodySpec, stateSpec, receiptSpec *torrentSpecFromDb + + if err := s.db.View(ctx, func(tx kv.Tx) error { + var err error + headerSpec, err = getTorrentSpec(tx, snapshotsync.SnapshotType_headers, networkId) + if err != nil { + return err + } + bodySpec, err = getTorrentSpec(tx, snapshotsync.SnapshotType_bodies, networkId) + if err != nil { + return err + } + stateSpec, err = getTorrentSpec(tx, snapshotsync.SnapshotType_state, networkId) + if err != nil { + return err + } + receiptSpec, err = getTorrentSpec(tx, snapshotsync.SnapshotType_receipts, networkId) + if err != nil { + return err + } + return nil }); err != nil { return nil, err } + + if mode.Headers { + eg.Go(func() error { + var newSpec *torrentSpecFromDb + var err error + newSpec, err = s.t.AddTorrent(ctx, headerSpec) + if err != nil { + return fmt.Errorf("add torrent: %w", err) + } + if newSpec != nil { + log.Info("Save spec", "snapshot", newSpec.snapshotType.String()) + if err = s.db.Update(ctx, func(tx kv.RwTx) error { + return saveTorrentSpec(tx, newSpec) + }); err != nil { + return err + } + if err != nil { + return err + } + } + return nil + }) + } + + if mode.Bodies { + eg.Go(func() error { + var newSpec *torrentSpecFromDb + var err error + newSpec, err = s.t.AddTorrent(ctx, bodySpec) + if err != nil { + return fmt.Errorf("add torrent: %w", err) + } + if newSpec != nil { + log.Info("Save spec", "snapshot", newSpec.snapshotType.String()) + if err = s.db.Update(ctx, func(tx kv.RwTx) error { + return saveTorrentSpec(tx, newSpec) + }); err != nil { + return err + } + if err != nil { + return err + } + } + return nil + }) + } + + if mode.State { + eg.Go(func() error { + var newSpec *torrentSpecFromDb + var err error + newSpec, err = s.t.AddTorrent(ctx, stateSpec) + if err != nil { + return fmt.Errorf("add torrent: %w", err) + } + if newSpec != nil { + log.Info("Save spec", "snapshot", newSpec.snapshotType.String()) + if err = s.db.Update(ctx, func(tx kv.RwTx) error { + return saveTorrentSpec(tx, newSpec) + }); err != nil { + return err + } + if err != nil { + return err + } + } + return nil + }) + } + + if mode.Receipts { + eg.Go(func() error { + var newSpec *torrentSpecFromDb + var err error + newSpec, err = s.t.AddTorrent(ctx, receiptSpec) + if err != nil { + return fmt.Errorf("add torrent: %w", err) + } + if newSpec != nil { + log.Info("Save spec", "snapshot", newSpec.snapshotType.String()) + if err = s.db.Update(ctx, func(tx kv.RwTx) error { + return saveTorrentSpec(tx, newSpec) + }); err != nil { + return err + } + if err != nil { + return err + } + } + return nil + }) + } + err := eg.Wait() + if err != nil { + return nil, err + } return &emptypb.Empty{}, nil } func (s *SNDownloaderServer) Load() error { diff --git a/turbo/snapshotsync/wrapdb.go b/turbo/snapshotsync/wrapdb.go index bc4de4dc43a..5d7b74f5e66 100644 --- a/turbo/snapshotsync/wrapdb.go +++ b/turbo/snapshotsync/wrapdb.go @@ -1,13 +1,9 @@ package snapshotsync import ( - "context" - "time" - "github.com/ledgerwatch/erigon-lib/gointerfaces/snapshotsync" "github.com/ledgerwatch/erigon-lib/kv" kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon/ethdb" "github.com/ledgerwatch/erigon/ethdb/snapshotdb" "github.com/ledgerwatch/log/v3" ) @@ -60,122 +56,3 @@ func WrapBySnapshotsFromDownloader(db kv.RwDB, snapshots map[snapshotsync.Snapsh return snKV.Open(), nil } - -func DownloadSnapshots(torrentClient *Client, ExternalSnapshotDownloaderAddr string, networkID uint64, snapshotMode SnapshotMode, chainDb ethdb.Database) error { - var downloadedSnapshots map[snapshotsync.SnapshotType]*snapshotsync.SnapshotsInfo - if ExternalSnapshotDownloaderAddr != "" { - cli, cl, innerErr := NewClient(ExternalSnapshotDownloaderAddr) - if innerErr != nil { - return innerErr - } - defer cl() //nolint - - _, innerErr = cli.Download(context.Background(), &snapshotsync.DownloadSnapshotRequest{ - NetworkId: networkID, - Type: snapshotMode.ToSnapshotTypes(), - }) - if innerErr != nil { - return innerErr - } - - waitDownload := func() (map[snapshotsync.SnapshotType]*snapshotsync.SnapshotsInfo, error) { - snapshotReadinessCheck := func(mp map[snapshotsync.SnapshotType]*snapshotsync.SnapshotsInfo, tp snapshotsync.SnapshotType) bool { - if mp[tp].Readiness != int32(100) { - log.Info("Downloading", "snapshot", tp, "%", mp[tp].Readiness) - return false - } - return true - } - for { - downloadedSnapshots = make(map[snapshotsync.SnapshotType]*snapshotsync.SnapshotsInfo) - snapshots, err1 := cli.Snapshots(context.Background(), &snapshotsync.SnapshotsRequest{NetworkId: networkID}) - if err1 != nil { - return nil, err1 - } - for i := range snapshots.Info { - if downloadedSnapshots[snapshots.Info[i].Type].SnapshotBlock < snapshots.Info[i].SnapshotBlock && snapshots.Info[i] != nil { - downloadedSnapshots[snapshots.Info[i].Type] = snapshots.Info[i] - } - } - - downloaded := true - if snapshotMode.Headers { - if !snapshotReadinessCheck(downloadedSnapshots, snapshotsync.SnapshotType_headers) { - downloaded = false - } - } - if snapshotMode.Bodies { - if !snapshotReadinessCheck(downloadedSnapshots, snapshotsync.SnapshotType_bodies) { - downloaded = false - } - } - if snapshotMode.State { - if !snapshotReadinessCheck(downloadedSnapshots, snapshotsync.SnapshotType_state) { - downloaded = false - } - } - if snapshotMode.Receipts { - if !snapshotReadinessCheck(downloadedSnapshots, snapshotsync.SnapshotType_receipts) { - downloaded = false - } - } - if downloaded { - return downloadedSnapshots, nil - } - time.Sleep(time.Second * 10) - } - } - downloadedSnapshots, innerErr := waitDownload() - if innerErr != nil { - return innerErr - } - snapshotKV := chainDb.(ethdb.HasRwKV).RwKV() - - snapshotKV, innerErr = WrapBySnapshotsFromDownloader(snapshotKV, downloadedSnapshots) - if innerErr != nil { - return innerErr - } - chainDb.(ethdb.HasRwKV).SetRwKV(snapshotKV) - - if err := PostProcessing(chainDb.RwKV(), downloadedSnapshots); err != nil { - return err - } - - } else { - if err := chainDb.RwKV().Update(context.Background(), func(tx kv.RwTx) error { - err := torrentClient.Load(tx) - if err != nil { - return err - } - return torrentClient.AddSnapshotsTorrents(context.Background(), tx, networkID, snapshotMode) - }); err != nil { - log.Error("There was an error in snapshot init. Swithing to regular sync", "err", err) - } else { - torrentClient.Download() - var innerErr error - var downloadedSnapshots map[snapshotsync.SnapshotType]*snapshotsync.SnapshotsInfo - if err := chainDb.RwKV().View(context.Background(), func(tx kv.Tx) (err error) { - downloadedSnapshots, err = torrentClient.GetSnapshots(tx, networkID) - if err != nil { - return err - } - return nil - }); err != nil { - return err - } - - snapshotKV := chainDb.(ethdb.HasRwKV).RwKV() - snapshotKV, innerErr = WrapBySnapshotsFromDownloader(snapshotKV, downloadedSnapshots) - if innerErr != nil { - return innerErr - } - chainDb.(ethdb.HasRwKV).SetRwKV(snapshotKV) - if err := PostProcessing(snapshotKV, downloadedSnapshots); err != nil { - - return err - } - } - - } - return nil -}