Skip to content

Commit

Permalink
Add DB migration status in the dev dashboard. (#1737)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikcarlsson authored Jan 24, 2025
1 parent 87a6d09 commit 8bc0b1d
Show file tree
Hide file tree
Showing 6 changed files with 311 additions and 14 deletions.
2 changes: 1 addition & 1 deletion cli/cmd/encore/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (d *Daemon) serveObjects() {

func (d *Daemon) serveDash() {
log.Info().Stringer("addr", d.Dash.Addr()).Msg("serving dash")
srv := dash.NewServer(d.Apps, d.RunMgr, d.Trace, d.Dash.Port())
srv := dash.NewServer(d.Apps, d.RunMgr, d.NS, d.Trace, d.Dash.Port())
d.exit <- http.Serve(d.Dash, srv)
}

Expand Down
115 changes: 115 additions & 0 deletions cli/daemon/dash/dash.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"encr.dev/cli/daemon/apps"
"encr.dev/cli/daemon/dash/ai"
"encr.dev/cli/daemon/engine/trace2"
"encr.dev/cli/daemon/namespace"
"encr.dev/cli/daemon/run"
"encr.dev/cli/daemon/sqldb"
"encr.dev/cli/internal/browser"
"encr.dev/cli/internal/jsonrpc2"
"encr.dev/cli/internal/onboarding"
Expand All @@ -40,6 +42,7 @@ type handler struct {
rpc jsonrpc2.Conn
apps *apps.Manager
run *run.Manager
ns *namespace.Manager
ai *ai.Manager
tr trace2.Store
}
Expand All @@ -64,6 +67,23 @@ func (h *handler) GetMeta(appID string) (*meta.Data, error) {
return md, nil
}

func (h *handler) GetNamespace(ctx context.Context, appID string) (*namespace.Namespace, error) {
runInstance := h.run.FindRunByAppID(appID)
if runInstance != nil && runInstance.ProcGroup() != nil {
return runInstance.NS, nil
} else {
app, err := h.apps.FindLatestByPlatformOrLocalID(appID)
if err != nil {
return nil, err
}
ns, err := h.ns.GetActive(ctx, app)
if err != nil {
return nil, err
}
return ns, nil
}
}

func (h *handler) Handle(ctx context.Context, reply jsonrpc2.Replier, r jsonrpc2.Request) error {
reply = makeProtoReplier(reply)

Expand Down Expand Up @@ -264,7 +284,39 @@ func (h *handler) Handle(ctx context.Context, reply jsonrpc2.Replier, r jsonrpc2
}

return reply(ctx, status, nil)
case "db-migration-status":
var params struct {
AppID string
}
if err := unmarshal(&params); err != nil {
return reply(ctx, nil, err)
}

// Find the latest app by platform ID or local ID.
app, err := h.apps.FindLatestByPlatformOrLocalID(params.AppID)
if err != nil {
return reply(ctx, nil, err)
}

appMeta, err := h.GetMeta(params.AppID)
if err != nil {
return reply(ctx, nil, err)
}

namespace, err := h.GetNamespace(ctx, params.AppID)
if err != nil {
return reply(ctx, nil, err)
}

clusterType := sqldb.Run
cluster, ok := h.run.ClusterMgr.Get(sqldb.GetClusterID(app, clusterType, namespace))
if !ok {
return reply(ctx, nil, fmt.Errorf("failed to get database cluster of type %s", clusterType))
}

status := buildDbMigrationStatus(ctx, appMeta, cluster)

return reply(ctx, status, nil)
case "api-call":
telemetry.Send("api.call")
var params apiCallParams
Expand Down Expand Up @@ -1015,6 +1067,18 @@ type appStatus struct {
CompileError string `json:"compileError,omitempty"`
}

type dbMigrationHistory struct {
DatabaseName string `json:"databaseName"`
Migrations []dbMigration `json:"migrations"`
}

type dbMigration struct {
Filename string `json:"filename"`
Number uint64 `json:"number"`
Description string `json:"description"`
Applied bool `json:"applied"`
}

func buildAppStatus(app *apps.Instance, runInstance *run.Run) (s appStatus, err error) {
// Now try and grab latest metadata for the app
var md *meta.Data
Expand Down Expand Up @@ -1064,3 +1128,54 @@ func buildAppStatus(app *apps.Instance, runInstance *run.Run) (s appStatus, err

return resp, nil
}

func buildDbMigrationStatus(ctx context.Context, appMeta *meta.Data, cluster *sqldb.Cluster) []dbMigrationHistory {
var statuses []dbMigrationHistory
for _, dbMeta := range appMeta.SqlDatabases {
db, ok := cluster.GetDB(dbMeta.Name)
if !ok {
log.Error().Msgf("failed to get database %s", dbMeta.Name)
continue
}
appliedVersions, err := db.ListAppliedMigrations(ctx)
if err != nil {
log.Error().Msgf("failed to list applied migrations for database %s: %v", dbMeta.Name, err)
continue
}
statuses = append(statuses, buildMigrationHistory(dbMeta, appliedVersions))
}
return statuses
}

func buildMigrationHistory(dbMeta *meta.SQLDatabase, appliedVersions map[uint64]bool) dbMigrationHistory {
history := dbMigrationHistory{
DatabaseName: dbMeta.Name,
Migrations: []dbMigration{},
}
// Go over migrations from latest to earliest
sortedMigrations := make([]*meta.DBMigration, len(dbMeta.Migrations))
copy(sortedMigrations, dbMeta.Migrations)
slices.SortStableFunc(sortedMigrations, func(a, b *meta.DBMigration) int {
return int(b.Number - a.Number)
})
implicitlyApplied := false
for _, migration := range sortedMigrations {
dirty, attempted := appliedVersions[migration.Number]
applied := attempted && !dirty
// If the database doesn't allow non-sequential migrations,
// then any migrations before the last applied will also have
// been applied even if we don't see them in the database.
if !dbMeta.AllowNonSequentialMigrations && applied {
implicitlyApplied = true
}

status := dbMigration{
Filename: migration.Filename,
Number: migration.Number,
Description: migration.Description,
Applied: applied || implicitlyApplied,
}
history.Migrations = append(history.Migrations, status)
}
return history
}
138 changes: 138 additions & 0 deletions cli/daemon/dash/dash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package dash

import (
"reflect"
"testing"

meta "encr.dev/proto/encore/parser/meta/v1"
)

func TestBuildMigrationHistory(t *testing.T) {
tests := []struct {
name string
dbMeta *meta.SQLDatabase
appliedVersions map[uint64]bool
want dbMigrationHistory
}{
{
name: "sequential migrations all applied cleanly",
dbMeta: &meta.SQLDatabase{
Name: "test-db",
Migrations: []*meta.DBMigration{
{Number: 1, Filename: "001.sql", Description: "first"},
{Number: 2, Filename: "002.sql", Description: "second"},
{Number: 3, Filename: "003.sql", Description: "third"},
},
AllowNonSequentialMigrations: false,
},
appliedVersions: map[uint64]bool{
1: false, // clean
2: false, // clean
3: false, // clean
},
want: dbMigrationHistory{
DatabaseName: "test-db",
Migrations: []dbMigration{
{Number: 3, Filename: "003.sql", Description: "third", Applied: true},
{Number: 2, Filename: "002.sql", Description: "second", Applied: true},
{Number: 1, Filename: "001.sql", Description: "first", Applied: true},
},
},
},
{
name: "sequential migrations with dirty migration",
dbMeta: &meta.SQLDatabase{
Name: "test-db",
Migrations: []*meta.DBMigration{
{Number: 1, Filename: "001.sql", Description: "first"},
{Number: 2, Filename: "002.sql", Description: "second"},
{Number: 3, Filename: "003.sql", Description: "third"},
},
AllowNonSequentialMigrations: false,
},
appliedVersions: map[uint64]bool{
1: false, // clean
2: true, // dirty
},
want: dbMigrationHistory{
DatabaseName: "test-db",
Migrations: []dbMigration{
{Number: 3, Filename: "003.sql", Description: "third", Applied: false},
{Number: 2, Filename: "002.sql", Description: "second", Applied: false},
{Number: 1, Filename: "001.sql", Description: "first", Applied: true},
},
},
},
{
name: "sequential migrations partially applied",
dbMeta: &meta.SQLDatabase{
Name: "test-db",
Migrations: []*meta.DBMigration{
{Number: 1, Filename: "001.sql", Description: "first"},
{Number: 2, Filename: "002.sql", Description: "second"},
{Number: 3, Filename: "003.sql", Description: "third"},
},
AllowNonSequentialMigrations: false,
},
appliedVersions: map[uint64]bool{
1: false, // clean
2: false, // clean
},
want: dbMigrationHistory{
DatabaseName: "test-db",
Migrations: []dbMigration{
{Number: 3, Filename: "003.sql", Description: "third", Applied: false},
{Number: 2, Filename: "002.sql", Description: "second", Applied: true},
{Number: 1, Filename: "001.sql", Description: "first", Applied: true},
},
},
},
{
name: "non-sequential migrations with mix of clean and dirty",
dbMeta: &meta.SQLDatabase{
Name: "test-db",
Migrations: []*meta.DBMigration{
{Number: 1, Filename: "001.sql", Description: "first"},
{Number: 2, Filename: "002.sql", Description: "second"},
{Number: 3, Filename: "003.sql", Description: "third"},
},
AllowNonSequentialMigrations: true,
},
appliedVersions: map[uint64]bool{
1: false, // clean
2: true, // dirty
3: false, // clean
},
want: dbMigrationHistory{
DatabaseName: "test-db",
Migrations: []dbMigration{
{Number: 3, Filename: "003.sql", Description: "third", Applied: true},
{Number: 2, Filename: "002.sql", Description: "second", Applied: false},
{Number: 1, Filename: "001.sql", Description: "first", Applied: true},
},
},
},
{
name: "empty migrations list",
dbMeta: &meta.SQLDatabase{
Name: "test-db",
Migrations: []*meta.DBMigration{},
AllowNonSequentialMigrations: false,
},
appliedVersions: map[uint64]bool{},
want: dbMigrationHistory{
DatabaseName: "test-db",
Migrations: []dbMigration{},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := buildMigrationHistory(tt.dbMeta, tt.appliedVersions)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("buildMigrationHistory() = %v, want %v", got, tt.want)
}
})
}
}
7 changes: 5 additions & 2 deletions cli/daemon/dash/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"encr.dev/cli/daemon/dash/apiproxy"
"encr.dev/cli/daemon/dash/dashproxy"
"encr.dev/cli/daemon/engine/trace2"
"encr.dev/cli/daemon/namespace"
"encr.dev/cli/daemon/run"
"encr.dev/cli/internal/jsonrpc2"
"encr.dev/internal/conf"
Expand All @@ -27,7 +28,7 @@ var upgrader = websocket.Upgrader{
}

// NewServer starts a new server and returns it.
func NewServer(appsMgr *apps.Manager, runMgr *run.Manager, tr trace2.Store, dashPort int) *Server {
func NewServer(appsMgr *apps.Manager, runMgr *run.Manager, nsMgr *namespace.Manager, tr trace2.Store, dashPort int) *Server {
proxy, err := dashproxy.New(conf.DevDashURL)
if err != nil {
log.Fatal().Err(err).Msg("could not create dash proxy")
Expand All @@ -45,6 +46,7 @@ func NewServer(appsMgr *apps.Manager, runMgr *run.Manager, tr trace2.Store, dash
apiProxy: apiProxy,
apps: appsMgr,
run: runMgr,
ns: nsMgr,
tr: tr,
dashPort: dashPort,
traceCh: make(chan trace2.NewSpanEvent, 10),
Expand All @@ -64,6 +66,7 @@ type Server struct {
apiProxy *httputil.ReverseProxy
apps *apps.Manager
run *run.Manager
ns *namespace.Manager
tr trace2.Store
dashPort int
traceCh chan trace2.NewSpanEvent
Expand Down Expand Up @@ -96,7 +99,7 @@ func (s *Server) WebSocket(w http.ResponseWriter, req *http.Request) {

stream := &wsStream{c: c}
conn := jsonrpc2.NewConn(stream)
handler := &handler{rpc: conn, apps: s.apps, run: s.run, tr: s.tr, ai: s.ai}
handler := &handler{rpc: conn, apps: s.apps, run: s.run, ns: s.ns, tr: s.tr, ai: s.ai}
conn.Go(req.Context(), handler.Handle)

ch := make(chan *notification, 20)
Expand Down
31 changes: 31 additions & 0 deletions cli/daemon/sqldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,15 @@ func (db *DB) doMigrate(ctx context.Context, cloudName, appRoot string, dbMeta *
return nil
}

func (db *DB) ListAppliedMigrations(ctx context.Context) (map[uint64]bool, error) {
conn, err := db.connectToDB(ctx)
if err != nil {
return nil, err
}
defer fns.CloseIgnore(conn)
return LoadAppliedVersions(ctx, conn, "public", "schema_migrations")
}

func RunMigration(ctx context.Context, dbName string, allowNonSeq bool, conn *sql.Conn, mdSrc *MetadataSource) (err error) {
var (
dbDriver database.Driver
Expand Down Expand Up @@ -477,3 +486,25 @@ func (db *DB) connectSuperuser(ctx context.Context) (*pgx.Conn, error) {
db.log.Debug().Err(err).Msgf("failed to connect to admin db")
return nil, fmt.Errorf("failed to connect to superuser database: %v", err)
}

// Connects as a superuser or admin to the database. Fails fast if the cluster
// is not running yet.
// On success the returned conn must be closed by the caller.
func (db *DB) connectToDB(ctx context.Context) (*sql.Conn, error) {
info, err := db.Cluster.Info(ctx)
if err != nil {
return nil, err
}
uri := info.ConnURI(db.EncoreName, info.Config.Superuser)
pool, err := sql.Open("pgx", uri)
if err != nil {
return nil, err
}
defer fns.CloseIgnore(pool)

conn, err := pool.Conn(ctx)
if err != nil {
return nil, err
}
return conn, nil
}
Loading

0 comments on commit 8bc0b1d

Please sign in to comment.