Skip to content

Commit

Permalink
set up proof collection coordinator. closes #10
Browse files Browse the repository at this point in the history
  • Loading branch information
gjermundgaraba committed Jun 30, 2024
1 parent eca2c65 commit 9c33314
Show file tree
Hide file tree
Showing 15 changed files with 1,765 additions and 59 deletions.
42 changes: 42 additions & 0 deletions prover-sidecar/cmd/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package cmd

import (
"github.com/spf13/cobra"
"proversidecar/config"
)

const (
configCommandName = "config" // so we can reference it in the root command pre-run hook

flagForceInitConfig = "force"
)

func ConfigCmd() *cobra.Command {
cmd := &cobra.Command{
Use: configCommandName,
Short: "config subcommands",
}

cmd.AddCommand(InitConfigCmd())

return cmd
}

func InitConfigCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "init",
Short: "init config file",
RunE: func(cmd *cobra.Command, args []string) error {
force, _ := cmd.Flags().GetBool(flagForceInitConfig)

homedir := GetHomedir(cmd)
logger := GetLogger(cmd)

return config.InitConfig(logger, homedir, force)
},
}

cmd.Flags().Bool(flagForceInitConfig, false, "force overwrite of existing config file")

return cmd
}
67 changes: 52 additions & 15 deletions prover-sidecar/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"errors"
"github.com/spf13/cobra"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"os"
"path/filepath"
"proversidecar/server"
"proversidecar/utils"
"proversidecar/config"
)

const (
Expand All @@ -34,7 +35,8 @@ func RootCmd() *cobra.Command {
if err != nil {
return err
}
logger := utils.CreateLogger(verbose)

logger := CreateLogger(verbose)
cmd.SetContext(context.WithValue(cmd.Context(), ContextKeyLogger, logger))

homedir, err := cmd.Flags().GetString(flagHome)
Expand All @@ -43,30 +45,33 @@ func RootCmd() *cobra.Command {
}
cmd.SetContext(context.WithValue(cmd.Context(), ContextKeyHomedir, homedir))

config, found, err := server.ReadConfig(homedir)
if err != nil {
return err
}
if cmd.Parent() != nil && cmd.Parent().Name() != configCommandName {
sidecarConfig, found, err := config.ReadConfig(homedir)
if err != nil {
return err
}

if !found {
if err := server.InitConfig(homedir); err != nil {
if !found {
if err := config.InitConfig(logger, homedir, false); err != nil {
return err
}
return errors.New("config.toml was not found, example config.toml created")
}

if err := sidecarConfig.Validate(); err != nil {
return err
}
return errors.New("config.toml was not found, example config.toml created")
}

if err := config.Validate(); err != nil {
return err
cmd.SetContext(context.WithValue(cmd.Context(), ContextKeyConfig, sidecarConfig))
}

cmd.SetContext(context.WithValue(cmd.Context(), ContextKeyConfig, config))

return nil
},
}

cmd.AddCommand(
StartCmd(),
ConfigCmd(),
)

userHomeDir, err := os.UserHomeDir()
Expand All @@ -80,3 +85,35 @@ func RootCmd() *cobra.Command {

return cmd
}

func CreateLogger(verbose bool) *zap.Logger {
logLevel := zapcore.InfoLevel
if verbose {
logLevel = zapcore.DebugLevel
}

loggerConfig := zap.NewProductionConfig()
loggerConfig.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
loggerConfig.Encoding = "console"
loggerConfig.Level = zap.NewAtomicLevelAt(logLevel)

// Create the logger from the core
logger, err := loggerConfig.Build()
if err != nil {
panic(err)
}

return logger
}

func GetLogger(cmd *cobra.Command) *zap.Logger {
return cmd.Context().Value(ContextKeyLogger).(*zap.Logger)
}

func GetHomedir(cmd *cobra.Command) string {
return cmd.Context().Value(ContextKeyHomedir).(string)
}

func GetConfig(cmd *cobra.Command) config.Config {
return cmd.Context().Value(ContextKeyConfig).(config.Config)
}
37 changes: 31 additions & 6 deletions prover-sidecar/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package cmd

import (
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"proversidecar/coordinator"
"proversidecar/server"
)

const (
defaultPort = 6969

flagListenAddr = "listen-addr"
)

Expand All @@ -18,17 +19,41 @@ func StartCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
listenAddr, _ := cmd.Flags().GetString(flagListenAddr)

s := &server.Server{}
if err := s.Serve(listenAddr); err != nil {
sidecarConfig := GetConfig(cmd)
logger := GetLogger(cmd)

coord, err := coordinator.NewCoordinator(logger, sidecarConfig)
if err != nil {
return err
}

return nil
s := server.NewServer(logger)

var eg errgroup.Group

eg.Go(func() error {
if err := s.Serve(listenAddr); err != nil {
logger.Error("server.Serve crashed", zap.Error(err))
return err
}

return nil
})

eg.Go(func() error {
if err := coord.Run(cmd.Context()); err != nil {
logger.Error("coordinator.Run crashed", zap.Error(err))
return err
}

return nil
})

return eg.Wait()
},
}

cmd.Flags().String(flagListenAddr, "localhost:6969", "Address for grpc server to listen on")


return cmd
}
38 changes: 28 additions & 10 deletions prover-sidecar/server/config.go → prover-sidecar/config/config.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@
package server
package config

import (
"github.com/pelletier/go-toml/v2"
"gitlab.com/tozd/go/errors"
"go.uber.org/zap"
"os"
"path"
)

const configFileName = "config.toml"
const (
configFileName = "config.toml"
)

type Config struct {
CometBFTChains []CometBFTChain `toml:"comet_bft_chains"`
CosmosChains []CosmosChainConfig `toml:"cosmos_chain"`
}

type CometBFTChain struct {
type CosmosChainConfig struct {
ChainID string `toml:"chain_id"`
RPC string `toml:"rpc"`
ClientID string `toml:"client_id"`
}

func (c Config) Validate() error {
if len(c.CometBFTChains) == 0 {
if len(c.CosmosChains) == 0 {
return errors.New("at least one chain must be defined in the config")
}

seenChainIDs := make(map[string]bool)
for _, chain := range c.CometBFTChains {
for _, chain := range c.CosmosChains {
if chain.ChainID == "" {
return errors.New("chain id cannot be empty")
}
Expand All @@ -33,6 +37,10 @@ func (c Config) Validate() error {
return errors.New("rpc address cannot be empty")
}

if chain.ClientID == "" {
return errors.New("client id cannot be empty")
}

if _, ok := seenChainIDs[chain.ChainID]; ok {
return errors.New("duplicate chain id")
}
Expand Down Expand Up @@ -66,18 +74,28 @@ func ReadConfig(homedir string) (Config, bool, error) {
return config, true, nil
}

func InitConfig(homedir string) error {
func InitConfig(logger *zap.Logger, homedir string, force bool) error {
configFilePath := getConfigFilePath(homedir)

if !force {
_, err := os.Stat(configFilePath)
if !os.IsNotExist(err) {
return errors.Errorf("config file already exists at %s", configFilePath)
}
}

logger.Debug("InitConfig", zap.String("configFilePath", configFilePath))

if err := os.MkdirAll(homedir, os.ModePerm); err != nil {
return err
}

configFilePath := getConfigFilePath(homedir)

config := Config{
CometBFTChains: []CometBFTChain{
CosmosChains: []CosmosChainConfig{
{
ChainID: "example-1",
RPC: "http://localhost:26657",
ClientID: "example-1-client",
},
},
}
Expand Down
77 changes: 77 additions & 0 deletions prover-sidecar/coordinator/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package coordinator

import (
"context"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"proversidecar/config"
"proversidecar/provers"
"proversidecar/provers/cosmos"
"time"
)

const (
defaultMinQueryLoopDuration = 1 * time.Second
)

type Coordinator struct {
logger *zap.Logger

chainProvers []provers.ChainProver
}

func NewCoordinator(logger *zap.Logger, sidecarConfig config.Config) (*Coordinator, error) {
var chainProvers []provers.ChainProver
for _, cosmosConfig := range sidecarConfig.CosmosChains {
prover, err := cosmos.NewCosmosProver(logger, cosmosConfig.ChainID, cosmosConfig.RPC, cosmosConfig.ClientID)
if err != nil {
return nil, err
}
chainProvers = append(chainProvers, prover)
}

return &Coordinator{
logger: logger,
chainProvers: chainProvers,
}, nil
}

func (c *Coordinator) Run(ctx context.Context) error {
c.logger.Debug("Coordinator.Run")

var eg errgroup.Group
runCtx, runCtxCancel := context.WithCancel(ctx)
for _, chainProver := range c.chainProvers {
c.logger.Info("Starting chain prover loop", zap.String("chain_id", chainProver.ChainID()))

chainProver := chainProver
eg.Go(func() error {
err := c.chainProverLoop(runCtx, chainProver)
runCtxCancel() // Signal the other chain processors to exit.
return err
})
}

err := eg.Wait()
runCtxCancel()
return err
}

func (c *Coordinator) chainProverLoop(ctx context.Context, chainProver provers.ChainProver) error {
ticker := time.NewTicker(defaultMinQueryLoopDuration) // TODO: Make this configurable per chain
defer ticker.Stop()

for {
// TODO: Add retry logic
c.logger.Info("Collecting proofs", zap.String("chain_id", chainProver.ChainID()))
if err := chainProver.CollectProofs(ctx); err != nil {
return err
}
select {
case <-ctx.Done():
return nil
case <-ticker.C:
ticker.Reset(defaultMinQueryLoopDuration)
}
}
}
Loading

0 comments on commit 9c33314

Please sign in to comment.