Skip to content

Commit

Permalink
first revisit downloader service (#3093)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Dec 6, 2021
1 parent b7aa5be commit 3f34dee
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 308 deletions.
142 changes: 56 additions & 86 deletions cmd/downloader/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,35 @@ import (
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"github.com/urfave/cli"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
)

var (
datadir string
seeding bool
downloaderApiAddr string
)

func init() {
flags := append(debug.Flags, utils.MetricFlags...)
flags = append(flags, Addr, DataDir)
utils.CobraFlags(rootCmd, flags)

rootCmd.PersistentFlags().Bool("seeding", true, "Seed snapshots")
}

var (
Addr = cli.StringFlag{
Name: "downloader.api.addr",
Usage: "external downloader api network address, for example: 127.0.0.1:9093 serves remote downloader interface",
Value: "127.0.0.1:9093",
}
DataDir = cli.StringFlag{
Name: utils.DataDirFlag.Name,
Usage: utils.DataDirFlag.Usage,
Value: paths.DefaultDataDir(),
rootCmd.Flags().StringVar(&datadir, utils.DataDirFlag.Name, paths.DefaultDataDir(), utils.DataDirFlag.Usage)
if err := rootCmd.MarkFlagDirname(utils.DataDirFlag.Name); err != nil {
panic(err)
}
)

type Config struct {
Addr string
Dir string
Seeding bool
rootCmd.PersistentFlags().BoolVar(&seeding, "seeding", true, "Seed snapshots")
rootCmd.Flags().StringVar(&downloaderApiAddr, "downloader.api.addr", "127.0.0.1:9093", "external downloader api network address, for example: 127.0.0.1:9093 serves remote downloader interface")
}

func main() {
ctx, cancel := utils.RootContext()
defer cancel()

if err := rootCmd.MarkFlagDirname(utils.DataDirFlag.Name); err != nil {
panic(err)
}

if err := rootCmd.ExecuteContext(ctx); err != nil {
fmt.Println(err)
os.Exit(1)
Expand All @@ -78,79 +65,62 @@ var rootCmd = &cobra.Command{
PersistentPostRun: func(cmd *cobra.Command, args []string) {
debug.Exit()
},
RunE: runDownloader,
}
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
snapshotsDir := path.Join(datadir, "snapshots")
log.Info("Run snapshot downloader", "addr", downloaderApiAddr, "datadir", datadir, "seeding", seeding)

func runDownloader(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
cfg := &Config{}
var err error
cfg.Addr, err = cmd.Flags().GetString(Addr.Name)
if err != nil {
return err
}
dir, err := cmd.Flags().GetString(DataDir.Name)
if err != nil {
return err
}
cfg.Dir = path.Join(dir, "snapshots")
cfg.Seeding, err = cmd.Flags().GetBool("seeding")
if err != nil {
return err
}
log.Info("Run snapshot downloader", "addr", cfg.Addr, "dir", cfg.Dir, "seeding", cfg.Seeding)

bittorrentServer, err := snapshotsync.NewServer(cfg.Dir, cfg.Seeding)
if err != nil {
return fmt.Errorf("new server: %w", err)
}
log.Info("Load")
err = bittorrentServer.Load()
if err != nil {
return fmt.Errorf("load: %w", err)
}

go func() {
_, err := bittorrentServer.Download(ctx, &proto_snap.DownloadSnapshotRequest{
NetworkId: params.MainnetChainConfig.ChainID.Uint64(),
Type: snapshotsync.GetAvailableSnapshotTypes(params.MainnetChainConfig.ChainID.Uint64()),
})
bittorrentServer, err := snapshotsync.NewServer(snapshotsDir, seeding)
if err != nil {
log.Error("Download failed", "err", err, "networkID", params.MainnetChainConfig.ChainID.Uint64())
return fmt.Errorf("new server: %w", err)
}
log.Info("Load")
err = bittorrentServer.Load()
if err != nil {
return fmt.Errorf("load: %w", err)
}
}()
go func() {
for {
select {
case <-cmd.Context().Done():
return
default:
}

snapshots, err := bittorrentServer.Snapshots(ctx, &proto_snap.SnapshotsRequest{
go func() {
_, err := bittorrentServer.Download(ctx, &proto_snap.DownloadSnapshotRequest{
NetworkId: params.MainnetChainConfig.ChainID.Uint64(),
Type: snapshotsync.GetAvailableSnapshotTypes(params.MainnetChainConfig.ChainID.Uint64()),
})
if err != nil {
log.Error("get snapshots", "err", err)
time.Sleep(time.Minute)
continue
log.Error("Download failed", "err", err, "networkID", params.MainnetChainConfig.ChainID.Uint64())
}
stats := bittorrentServer.Stats(context.Background())
for _, v := range snapshots.Info {
log.Info("Snapshot "+v.Type.String(), "%", v.Readiness, "peers", stats[v.Type.String()].ConnectedSeeders)
}()
go func() {
for {
select {
case <-cmd.Context().Done():
return
default:
}

snapshots, err := bittorrentServer.Snapshots(ctx, &proto_snap.SnapshotsRequest{
NetworkId: params.MainnetChainConfig.ChainID.Uint64(),
})
if err != nil {
log.Error("get snapshots", "err", err)
time.Sleep(time.Minute)
continue
}
stats := bittorrentServer.Stats(context.Background())
for _, v := range snapshots.Info {
log.Info("Snapshot "+v.Type.String(), "%", v.Readiness, "peers", stats[v.Type.String()].ConnectedSeeders)
}
time.Sleep(time.Minute)
}
time.Sleep(time.Minute)
}()
grpcServer, err := StartGrpc(bittorrentServer, downloaderApiAddr, nil)
if err != nil {
return err
}
}()
grpcServer, err := StartGrpc(bittorrentServer, cfg.Addr, nil)
if err != nil {
return err
}
<-cmd.Context().Done()
grpcServer.GracefulStop()

return nil
<-cmd.Context().Done()
grpcServer.GracefulStop()

return nil
},
}

func StartGrpc(snServer *snapshotsync.SNDownloaderServer, addr string, creds *credentials.TransportCredentials) (*grpc.Server, error) {
Expand Down
20 changes: 0 additions & 20 deletions cmd/downloader/utils/utils.go

This file was deleted.

Loading

0 comments on commit 3f34dee

Please sign in to comment.