Skip to content

Commit

Permalink
Support shadow databases
Browse files Browse the repository at this point in the history
This adds support for having Encore manage shadow databases,
commonly used with database migration tools like Atlas and Prisma.
  • Loading branch information
eandre committed Nov 21, 2023
1 parent 6d9c53c commit f81bff2
Show file tree
Hide file tree
Showing 8 changed files with 450 additions and 310 deletions.
83 changes: 58 additions & 25 deletions cli/cmd/encore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var dbCmd = &cobra.Command{
var (
resetAll bool
testDB bool
shadowDB bool
nsName string
)

Expand All @@ -53,7 +54,7 @@ var dbResetCmd = &cobra.Command{
stream, err := daemon.DBReset(ctx, &daemonpb.DBResetRequest{
AppRoot: appRoot,
DatabaseNames: dbNames,
Test: testDB,
ClusterType: dbClusterType(),
Namespace: nonZeroPtr(nsName),
})
if err != nil {
Expand All @@ -66,13 +67,16 @@ var dbResetCmd = &cobra.Command{
var dbEnv string

var dbShellCmd = &cobra.Command{
Use: "shell DATABASE_NAME [--test] [--env=<name>]",
Use: "shell DATABASE_NAME [--env=<name>] [--test|--shadow]",
Short: "Connects to the database via psql shell",
Long: `Defaults to connecting to your local environment.
Specify --env to connect to another environment.
Use --test to connect to databases used for integration testing.
--test implies --env=local.
Use --shadow to connect to the shadow database, used for database drift detection
when using tools like Prisma.
--test and --shadow imply --env=local.
`,
Args: cobra.MaximumNArgs(1),

Expand Down Expand Up @@ -105,16 +109,16 @@ Use --test to connect to databases used for integration testing.
}
}

if testDB {
if testDB || shadowDB {
dbEnv = "local"
}

resp, err := daemon.DBConnect(ctx, &daemonpb.DBConnectRequest{
AppRoot: appRoot,
DbName: dbName,
EnvName: dbEnv,
Test: testDB,
Namespace: nonZeroPtr(nsName),
AppRoot: appRoot,
DbName: dbName,
EnvName: dbEnv,
ClusterType: dbClusterType(),
Namespace: nonZeroPtr(nsName),
})
if err != nil {
fatalf("could not connect to the database for service %s: %v", dbName, err)
Expand Down Expand Up @@ -162,12 +166,15 @@ Use --test to connect to databases used for integration testing.
var dbProxyPort int32

var dbProxyCmd = &cobra.Command{
Use: "proxy [--env=<name>] [--test]",
Use: "proxy [--env=<name>] [--test|--shadow]",
Short: "Sets up a proxy tunnel to the database",
Long: `Set up a proxy tunnel to a database for use with other tools.
Use --test to connect to databases used for integration testing.
--test implies --env=local.
Use --shadow to connect to the shadow database, used for database drift detection
when using tools like Prisma.
--test and --shadow imply --env=local.
`,

Run: func(command *cobra.Command, args []string) {
Expand All @@ -181,17 +188,17 @@ Use --test to connect to databases used for integration testing.
cancel()
}()

if testDB {
if testDB || shadowDB {
dbEnv = "local"
}

daemon := setupDaemon(ctx)
stream, err := daemon.DBProxy(ctx, &daemonpb.DBProxyRequest{
AppRoot: appRoot,
EnvName: dbEnv,
Port: dbProxyPort,
Test: testDB,
Namespace: nonZeroPtr(nsName),
AppRoot: appRoot,
EnvName: dbEnv,
Port: dbProxyPort,
ClusterType: dbClusterType(),
Namespace: nonZeroPtr(nsName),
})
if err != nil {
log.Fatal().Err(err).Msg("could not setup db proxy")
Expand All @@ -201,9 +208,17 @@ Use --test to connect to databases used for integration testing.
}

var dbConnURICmd = &cobra.Command{
Use: "conn-uri [<db-name>] [--test]",
Use: "conn-uri [<db-name>] [--test|--shadow]",
Short: "Outputs the database connection string",
Args: cobra.MaximumNArgs(1),
Long: `Retrieve a stable connection uri for connecting to a database.
Use --test to connect to databases used for integration testing.
Use --shadow to connect to the shadow database, used for database drift detection
when using tools like Prisma.
--test and --shadow imply --env=local.
`,
Args: cobra.MaximumNArgs(1),

Run: func(command *cobra.Command, args []string) {
appRoot, relPath := determineAppRoot()
Expand Down Expand Up @@ -231,16 +246,16 @@ var dbConnURICmd = &cobra.Command{
}
}

if testDB {
if testDB || shadowDB {
dbEnv = "local"
}

resp, err := daemon.DBConnect(ctx, &daemonpb.DBConnectRequest{
AppRoot: appRoot,
DbName: dbName,
EnvName: dbEnv,
Test: testDB,
Namespace: nonZeroPtr(nsName),
AppRoot: appRoot,
DbName: dbName,
EnvName: dbEnv,
ClusterType: dbClusterType(),
Namespace: nonZeroPtr(nsName),
})
if err != nil {
st, ok := status.FromError(err)
Expand All @@ -262,21 +277,39 @@ func init() {
dbResetCmd.Flags().StringVarP(&nsName, "namespace", "n", "", "Namespace to use (defaults to active namespace)")
dbResetCmd.Flags().BoolVar(&resetAll, "all", false, "Reset all services in the application")
dbResetCmd.Flags().BoolVarP(&testDB, "test", "t", false, "Reset databases in the test cluster instead")
dbResetCmd.Flags().BoolVar(&shadowDB, "shadow", false, "Reset databases in the shadow cluster instead")
dbCmd.AddCommand(dbResetCmd)

dbShellCmd.Flags().StringVarP(&nsName, "namespace", "n", "", "Namespace to use (defaults to active namespace)")
dbShellCmd.Flags().StringVarP(&dbEnv, "env", "e", "local", "Environment name to connect to (such as \"prod\")")
dbShellCmd.Flags().BoolVarP(&testDB, "test", "t", false, "Connect to the integration test database (implies --env=local)")
dbShellCmd.Flags().BoolVar(&shadowDB, "shadow", false, "Connect to the shadow database (implies --env=local)")
dbCmd.AddCommand(dbShellCmd)

dbProxyCmd.Flags().StringVarP(&nsName, "namespace", "n", "", "Namespace to use (defaults to active namespace)")
dbProxyCmd.Flags().StringVarP(&dbEnv, "env", "e", "local", "Environment name to connect to (such as \"prod\")")
dbProxyCmd.Flags().Int32VarP(&dbProxyPort, "port", "p", 0, "Port to listen on (defaults to a random port)")
dbProxyCmd.Flags().BoolVarP(&testDB, "test", "t", false, "Connect to the integration test database (implies --env=local)")
dbProxyCmd.Flags().BoolVar(&shadowDB, "shadow", false, "Connect to the shadow database (implies --env=local)")
dbCmd.AddCommand(dbProxyCmd)

dbConnURICmd.Flags().StringVarP(&nsName, "namespace", "n", "", "Namespace to use (defaults to active namespace)")
dbConnURICmd.Flags().StringVarP(&dbEnv, "env", "e", "local", "Environment name to connect to (such as \"prod\")")
dbConnURICmd.Flags().BoolVarP(&testDB, "test", "t", false, "Connect to the integration test database (implies --env=local)")
dbConnURICmd.Flags().BoolVar(&shadowDB, "shadow", false, "Connect to the shadow database (implies --env=local)")
dbCmd.AddCommand(dbConnURICmd)
}

func dbClusterType() daemonpb.DBClusterType {
if testDB && shadowDB {
fatal("cannot specify both --test and --shadow")
}
switch {
case testDB:
return daemonpb.DBClusterType_DB_CLUSTER_TYPE_TEST
case shadowDB:
return daemonpb.DBClusterType_DB_CLUSTER_TYPE_SHADOW
default:
return daemonpb.DBClusterType_DB_CLUSTER_TYPE_RUN
}
}
43 changes: 26 additions & 17 deletions cli/daemon/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *Server) dbConnectLocal(ctx context.Context, req *daemonpb.DBConnectRequ
databaseExists := false
for _, s := range parse.Meta.SqlDatabases {
if s.Name == req.DbName {
databaseExists = len(s.Migrations) > 0
databaseExists = true
break
}
}
Expand All @@ -97,19 +97,21 @@ func (s *Server) dbConnectLocal(ctx context.Context, req *daemonpb.DBConnectRequ
return nil, err
}

clusterType := sqldb.Run
passwd := "local-" + string(clusterNS.ID)
if req.Test {
clusterType = sqldb.Test
passwd = "test-" + string(clusterNS.ID)
var passwd string
clusterType := getClusterType(req)
switch clusterType {
case sqldb.Run:
passwd = "local-" + string(clusterNS.ID)
default:
passwd = fmt.Sprintf("%s-%s", clusterType, clusterNS.ID)
}

clusterID := sqldb.GetClusterID(app, clusterType, clusterNS)
log := log.With().Interface("cluster", clusterID).Logger()
log.Info().Msg("setting up database cluster")
cluster := s.cm.Create(ctx, &sqldb.CreateParams{
ClusterID: clusterID,
Memfs: clusterType == sqldb.Test,
Memfs: clusterType.Memfs(),
})
// TODO would be nice to stream this to the CLI
if _, err := cluster.Start(ctx, nil); err != nil {
Expand Down Expand Up @@ -184,10 +186,7 @@ func (s *Server) DBProxy(params *daemonpb.DBProxyRequest, stream daemonpb.Daemon
return err
}

clusterType := sqldb.Run
if params.Test {
clusterType = sqldb.Test
}
clusterType := getClusterType(params)

clusterNS, err := s.namespaceOrActive(stream.Context(), app, params.Namespace)
if err != nil {
Expand All @@ -197,7 +196,7 @@ func (s *Server) DBProxy(params *daemonpb.DBProxyRequest, stream daemonpb.Daemon
clusterID := sqldb.GetClusterID(app, clusterType, clusterNS)
cluster := s.cm.Create(ctx, &sqldb.CreateParams{
ClusterID: clusterID,
Memfs: false,
Memfs: clusterType.Memfs(),
})
if _, err := cluster.Start(ctx, nil); err != nil {
return err
Expand Down Expand Up @@ -291,16 +290,13 @@ func (s *Server) DBReset(req *daemonpb.DBResetRequest, stream daemonpb.Daemon_DB
return nil
}

clusterType := sqldb.Run
if req.Test {
clusterType = sqldb.Test
}
clusterType := getClusterType(req)
clusterID := sqldb.GetClusterID(app, clusterType, clusterNS)
cluster, ok := s.cm.Get(clusterID)
if !ok {
cluster = s.cm.Create(stream.Context(), &sqldb.CreateParams{
ClusterID: clusterID,
Memfs: clusterType == sqldb.Test,
Memfs: clusterType.Memfs(),
})
}

Expand Down Expand Up @@ -340,3 +336,16 @@ func serveProxy(ctx context.Context, ln net.Listener, handler func(context.Conte
go handler(ctx, frontend)
}
}

func getClusterType(req interface{ GetClusterType() daemonpb.DBClusterType }) sqldb.ClusterType {
switch req.GetClusterType() {
case daemonpb.DBClusterType_DB_CLUSTER_TYPE_RUN:
return sqldb.Run
case daemonpb.DBClusterType_DB_CLUSTER_TYPE_TEST:
return sqldb.Test
case daemonpb.DBClusterType_DB_CLUSTER_TYPE_SHADOW:
return sqldb.Shadow
default:
return sqldb.Run
}
}
2 changes: 1 addition & 1 deletion cli/daemon/run/infra/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (rm *ResourceManager) StartSQLCluster(a *optracker.AsyncBuildJobs, md *meta

cluster := rm.sqlMgr.Create(ctx, &sqldb.CreateParams{
ClusterID: sqldb.GetClusterID(rm.app, typ, rm.ns),
Memfs: rm.forTests,
Memfs: typ.Memfs(),
})

if _, err := cluster.Start(ctx, a.Tracker()); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cli/daemon/sqldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func (db *DB) EnsureRoles(ctx context.Context, roles ...Role) error {

// Migrate migrates the database.
func (db *DB) Migrate(ctx context.Context, appRoot string, dbMeta *meta.SQLDatabase) (err error) {
if db.Cluster.ID.Type == Shadow {
db.log.Debug().Msg("not applying migrations to shadow cluster")
return nil
}
if len(dbMeta.Migrations) == 0 {
db.log.Debug().Msg("no database migrations to run, skipping")
return nil
Expand Down
16 changes: 14 additions & 2 deletions cli/daemon/sqldb/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,22 @@ type ConnConfig struct {
type ClusterType string

const (
Run ClusterType = "run"
Test ClusterType = "test"
Run ClusterType = "run"
Shadow ClusterType = "shadow"
Test ClusterType = "test"
)

func (ct ClusterType) Memfs() bool {
switch ct {
case Run:
return false
case Shadow, Test:
return true
default:
return false
}
}

// CreateParams are the params to (*ClusterManager).Create.
type CreateParams struct {
ClusterID ClusterID
Expand Down
4 changes: 3 additions & 1 deletion cli/daemon/sqldb/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func (cm *ClusterManager) ProxyConn(client net.Conn, waitForSetup bool) error {
ct = Run
case "test":
ct = Test
case "shadow":
ct = Shadow
default:
cm.log.Error().Str("password", startup.Password).Msg("dbproxy: invalid password for connection URI")
_ = cl.Backend.Send(&pgproto3.ErrorResponse{
Expand All @@ -139,7 +141,7 @@ func (cm *ClusterManager) ProxyConn(client net.Conn, waitForSetup bool) error {
// with the app in question yet on this run
cluster = cm.Create(context.Background(), &CreateParams{
ClusterID: GetClusterID(app, ct, ns),
Memfs: false,
Memfs: ct.Memfs(),
})

// Ensure the cluster is started
Expand Down
Loading

0 comments on commit f81bff2

Please sign in to comment.