Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add indexer db connection #89

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bin/local-startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ else
docker compose up -d mongodb
fi

# Check if the MongoDB container is already running
MONGO_CONTAINER_NAME="indexer-mongodb"
if [ $(docker ps -q -f name=^/${MONGO_CONTAINER_NAME}$) ]; then
echo "Indexer mongoDB container already running. Skipping MongoDB startup."
else
echo "Starting indexer mongoDB"
# Start indexer mongoDB
docker compose up -d indexer-mongodb
fi

# Check if the RabbitMQ container is already running
RABBITMQ_CONTAINER_NAME="rabbitmq"
if [ $(docker ps -q -f name=^/${RABBITMQ_CONTAINER_NAME}$) ]; then
Expand Down
4 changes: 2 additions & 2 deletions cmd/staking-api-service/scripts/pubkey_address_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
)

func BackfillPubkeyAddressesMappings(ctx context.Context, cfg *config.Config) error {
client, err := dbclient.NewMongoClient(ctx, cfg.Db)
client, err := dbclient.NewMongoClient(ctx, cfg.StakingDb)
if err != nil {
return fmt.Errorf("failed to create db client: %w", err)
}
v1dbClient, err := v1dbclient.New(ctx, client, cfg.Db)
v1dbClient, err := v1dbclient.New(ctx, client, cfg.StakingDb)
if err != nil {
return fmt.Errorf("failed to create db client: %w", err)
}
Expand Down
9 changes: 8 additions & 1 deletion config/config-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,21 @@ server:
btc-net: "mainnet"
max-content-length: 4096
health-check-interval: 300 # 5 minutes interval
db:
staking-db:
username: root
password: example
address: "mongodb://mongodb:27017"
db-name: staking-api-service
max-pagination-limit: 10
db-batch-size-limit: 100
logical-shard-count: 10
indexer-db:
username: root
password: example
address: "mongodb://indexer-mongodb:27019"
db-name: indexer-db
max-pagination-limit: 10
db-batch-size-limit: 100
queue:
queue_user: user # can be replaced by values in .env file
queue_password: password
Expand Down
9 changes: 8 additions & 1 deletion config/config-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ server:
btc-net: "signet"
max-content-length: 4096
health-check-interval: 300 # 5 minutes interval
db:
staking-db:
username: root
password: example
address: "mongodb://localhost:27017/?directConnection=true"
db-name: staking-api-service
max-pagination-limit: 10
db-batch-size-limit: 100
indexer-db:
username: root
password: example
address: "mongodb://localhost:27019/?directConnection=true"
db-name: indexer-db
max-pagination-limit: 10
db-batch-size-limit: 100
logical-shard-count: 2
queue:
queue_user: user # can be replaced by values in .env file
Expand Down
12 changes: 12 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ services:
volumes:
- ./bin/init-mongo.sh:/init-mongo.sh
entrypoint: [ "/init-mongo.sh" ]
indexer-mongodb:
image: mongo:latest
container_name: indexer-mongodb
hostname: indexer-mongodb
ports:
- "27019:27017"
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
volumes:
- ./bin/init-mongo.sh:/init-mongo.sh
entrypoint: [ "/init-mongo.sh" ]
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
Expand Down
17 changes: 11 additions & 6 deletions internal/shared/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,24 @@ import (
)

type Config struct {
Server *ServerConfig `mapstructure:"server"`
Db *DbConfig `mapstructure:"db"`
Queue *queue.QueueConfig `mapstructure:"queue"`
Metrics *MetricsConfig `mapstructure:"metrics"`
Assets *AssetsConfig `mapstructure:"assets"`
Server *ServerConfig `mapstructure:"server"`
StakingDb *DbConfig `mapstructure:"staking-db"`
IndexerDb *DbConfig `mapstructure:"indexer-db"`
Queue *queue.QueueConfig `mapstructure:"queue"`
Metrics *MetricsConfig `mapstructure:"metrics"`
Assets *AssetsConfig `mapstructure:"assets"`
}

func (cfg *Config) Validate() error {
if err := cfg.Server.Validate(); err != nil {
return err
}

if err := cfg.Db.Validate(); err != nil {
if err := cfg.StakingDb.Validate(); err != nil {
return err
}

if err := cfg.IndexerDb.Validate(); err != nil {
return err
}

Expand Down
18 changes: 10 additions & 8 deletions internal/shared/config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type DbConfig struct {
Address string `mapstructure:"address"`
MaxPaginationLimit int64 `mapstructure:"max-pagination-limit"`
DbBatchSizeLimit int64 `mapstructure:"db-batch-size-limit"`
LogicalShardCount int64 `mapstructure:"logical-shard-count"`
LogicalShardCount *int64 `mapstructure:"logical-shard-count"`
}

func (cfg *DbConfig) Validate() error {
Expand Down Expand Up @@ -72,14 +72,16 @@ func (cfg *DbConfig) Validate() error {
return fmt.Errorf("db batch size limit must be greater than 0")
}

if cfg.LogicalShardCount <= 1 {
return fmt.Errorf("logical shard count must be greater than 1")
}
if cfg.LogicalShardCount != nil {
if *cfg.LogicalShardCount <= 1 {
return fmt.Errorf("logical shard count must be greater than 1")
}

// Below is adding as a safety net to avoid performance issue.
// Changes to the logical shard count shall be discussed with the team
if cfg.LogicalShardCount > maxLogicalShardCount {
return fmt.Errorf("large logical shard count will have significant performance impact, please inform the team before changing this value")
// Below is adding as a safety net to avoid performance issue.
// Changes to the logical shard count shall be discussed with the team
if *cfg.LogicalShardCount > maxLogicalShardCount {
return fmt.Errorf("large logical shard count will have significant performance impact, please inform the team before changing this value")
}
}

return nil
Expand Down
40 changes: 28 additions & 12 deletions internal/shared/db/clients/db_clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,62 @@ import (

"github.com/babylonlabs-io/staking-api-service/internal/shared/config"
dbclient "github.com/babylonlabs-io/staking-api-service/internal/shared/db/client"
indexerdbclient "github.com/babylonlabs-io/staking-api-service/internal/shared/db/indexer_db_client"
v1dbclient "github.com/babylonlabs-io/staking-api-service/internal/v1/db/client"
v2dbclient "github.com/babylonlabs-io/staking-api-service/internal/v2/db/client"
"github.com/rs/zerolog/log"
"go.mongodb.org/mongo-driver/mongo"
)

type DbClients struct {
MongoClient *mongo.Client
SharedDBClient dbclient.DBClient
V1DBClient v1dbclient.V1DBClient
V2DBClient v2dbclient.V2DBClient
StakingMongoClient *mongo.Client
IndexerMongoClient *mongo.Client
SharedDBClient dbclient.DBClient
V1DBClient v1dbclient.V1DBClient
V2DBClient v2dbclient.V2DBClient
IndexerDBClient indexerdbclient.IndexerDBClient
jeremy-babylonlabs marked this conversation as resolved.
Show resolved Hide resolved
}

func New(ctx context.Context, cfg *config.Config) (*DbClients, error) {
mongoClient, err := dbclient.NewMongoClient(ctx, cfg.Db)
stakingMongoClient, err := dbclient.NewMongoClient(ctx, cfg.StakingDb)
if err != nil {
return nil, err
}

dbClient, err := dbclient.New(ctx, mongoClient, cfg.Db)
dbClient, err := dbclient.New(ctx, stakingMongoClient, cfg.StakingDb)
if err != nil {
return nil, err
}

v1dbClient, err := v1dbclient.New(ctx, mongoClient, cfg.Db)
v1dbClient, err := v1dbclient.New(ctx, stakingMongoClient, cfg.StakingDb)
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Msg("error while creating v1 db client")
return nil, err
}
v2dbClient, err := v2dbclient.New(ctx, mongoClient, cfg.Db)
v2dbClient, err := v2dbclient.New(ctx, stakingMongoClient, cfg.StakingDb)
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Msg("error while creating v2 db client")
return nil, err
}

indexerMongoClient, err := dbclient.NewMongoClient(ctx, cfg.IndexerDb)
if err != nil {
return nil, err
}

indexerDbClient, err := indexerdbclient.New(ctx, indexerMongoClient, cfg.IndexerDb)
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Msg("error while creating indexer db client")
return nil, err
}

dbClients := DbClients{
MongoClient: mongoClient,
SharedDBClient: dbClient,
V1DBClient: v1dbClient,
V2DBClient: v2dbClient,
StakingMongoClient: stakingMongoClient,
IndexerMongoClient: indexerMongoClient,
SharedDBClient: dbClient,
V1DBClient: v1dbClient,
V2DBClient: v2dbClient,
IndexerDBClient: indexerDbClient,
}

return &dbClients, nil
Expand Down
23 changes: 23 additions & 0 deletions internal/shared/db/indexer_db_client/db_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package indexerdbclient

import (
"context"

"github.com/babylonlabs-io/staking-api-service/internal/shared/config"
dbclient "github.com/babylonlabs-io/staking-api-service/internal/shared/db/client"
"go.mongodb.org/mongo-driver/mongo"
)

type IndexerDatabase struct {
*dbclient.Database
}

func New(ctx context.Context, client *mongo.Client, cfg *config.DbConfig) (*IndexerDatabase, error) {
return &IndexerDatabase{
Database: &dbclient.Database{
DbName: cfg.DbName,
Client: client,
Cfg: cfg,
},
}, nil
}
9 changes: 9 additions & 0 deletions internal/shared/db/indexer_db_client/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package indexerdbclient

import (
dbclient "github.com/babylonlabs-io/staking-api-service/internal/shared/db/client"
)

type IndexerDBClient interface {
dbclient.DBClient
}
8 changes: 4 additions & 4 deletions internal/shared/db/model/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ var collections = map[string][]index{

func Setup(ctx context.Context, cfg *config.Config) error {
credential := options.Credential{
Username: cfg.Db.Username,
Password: cfg.Db.Password,
Username: cfg.StakingDb.Username,
Password: cfg.StakingDb.Password,
}
clientOps := options.Client().ApplyURI(cfg.Db.Address).SetAuth(credential)
clientOps := options.Client().ApplyURI(cfg.StakingDb.Address).SetAuth(credential)
client, err := mongo.Connect(ctx, clientOps)
if err != nil {
return err
Expand All @@ -72,7 +72,7 @@ func Setup(ctx context.Context, cfg *config.Config) error {
defer cancel()

// Access a database and create collections.
database := client.Database(cfg.Db.DbName)
database := client.Database(cfg.StakingDb.DbName)

// Create collections.
for collection := range collections {
Expand Down
7 changes: 4 additions & 3 deletions internal/shared/services/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ func New(

// DoHealthCheck checks the health of the services by ping the database.
func (s *Service) DoHealthCheck(ctx context.Context) error {
return s.DbClients.V1DBClient.Ping(ctx)

// TODO: extend on this to ping indexer db
if err := s.DbClients.SharedDBClient.Ping(ctx); err != nil {
return err
}
return s.DbClients.IndexerDBClient.Ping(ctx)
}

func (s *Service) SaveUnprocessableMessages(ctx context.Context, messageBody, receipt string) *types.Error {
Expand Down
4 changes: 2 additions & 2 deletions internal/v1/db/client/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (v1dbclient *V1Database) SubtractOverallStats(
func (v1dbclient *V1Database) GetOverallStats(ctx context.Context) (*v1dbmodel.OverallStatsDocument, error) {
// The collection is sharded by the _id field, so we need to query all the shards
var shardsId []string
for i := 0; i < int(v1dbclient.Cfg.LogicalShardCount); i++ {
for i := 0; i < int(*v1dbclient.Cfg.LogicalShardCount); i++ {
shardsId = append(shardsId, fmt.Sprintf("%d", i))
}

Expand Down Expand Up @@ -201,7 +201,7 @@ func (v1dbclient *V1Database) GetOverallStats(ctx context.Context) (*v1dbmodel.O
// It's a logical shard to avoid locking the same field during concurrent writes
// The sharding number should never be reduced after roll out
func (v1dbclient *V1Database) generateOverallStatsId() (string, error) {
max := big.NewInt(int64(v1dbclient.Cfg.LogicalShardCount))
max := big.NewInt(*v1dbclient.Cfg.LogicalShardCount)
// Generate a secure random number within the range [0, max)
n, err := rand.Int(rand.Reader, max)
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion tests/config/config-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,21 @@ server:
btc-net: "signet"
max-content-length: 40960
health-check-interval: 2
db:
staking-db:
username: root
password: example
address: "mongodb://localhost:27017"
db-name: staking-api-service
max-pagination-limit: 10
db-batch-size-limit: 100
logical-shard-count: 2
indexer-db:
username: root
password: example
address: "mongodb://localhost:27019"
db-name: indexer-db
max-pagination-limit: 10
db-batch-size-limit: 100
queue:
queue_user: user
queue_password: password
Expand Down
Loading
Loading