From 58fa7ae970c5311f5cd41ce609b62698e63ce939 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Mon, 12 Feb 2024 16:37:15 +0100 Subject: [PATCH] Add SQL `onComplete` flag, allow `sql` migration to run with others (#280) This change updates the `sql` operation to: * Allow for a new `onComplete` flag, that will make it run on the Complete phase, rather than doing it during Start (default behavior). * Allows for `sql` operations next to others in the same migration. We added this limitation to ensure this operation doesn't affect others, especially around schema state management. Having `sql` next to other operations has proven convenient sometimes, by adding `onComplete` flag, we can allow for these migrations to run and rely on views recreation based on the final state. --------- Co-authored-by: Andrew Farries --- docs/README.md | 20 ++++++++ examples/32_sql_on_complete.json | 11 +++++ pkg/jsonschema/testdata/sql-3.txtar | 18 +++++++ pkg/jsonschema/testdata/sql-4.txtar | 19 ++++++++ pkg/migrations/migrations.go | 8 +-- pkg/migrations/migrations_test.go | 17 ++++++- pkg/migrations/op_raw_sql.go | 18 +++++-- pkg/migrations/op_raw_sql_test.go | 75 +++++++++++++++++++++++++++++ pkg/migrations/types.go | 3 ++ pkg/roll/execute.go | 50 ++++++++++++++----- schema.json | 25 ++++++++++ 11 files changed, 244 insertions(+), 20 deletions(-) create mode 100644 examples/32_sql_on_complete.json create mode 100644 pkg/jsonschema/testdata/sql-3.txtar create mode 100644 pkg/jsonschema/testdata/sql-4.txtar diff --git a/docs/README.md b/docs/README.md index b059ce2a..89abb760 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1068,9 +1068,29 @@ A raw SQL operation runs arbitrary SQL against the database. This is intended as } ``` +By default, a `sql` operation cannot run together with other operations in the same migration. This is to ensure pgroll can correctly track the state of the database. However, it is possible to run a `sql` operation together with other operations by setting the `onComplete` flag to `true`. + +The `onComplete` flag will make this operation run the `up` expression on the complete phase (instead of the default, which is to run it on the start phase). + +`onComplete` flag is incompatible with `down` expression, as `pgroll` does not support running rollback after complete was executed. + + + + +```json +{ + "sql": { + "up": "SQL expression", + "onComplete": true + } +} +``` + Example **raw SQL** migrations: * [05_sql.json](../examples/05_sql.json) +* [32_sql_on_complete.json](../examples/32_sql_on_complete.json) + ### Rename table diff --git a/examples/32_sql_on_complete.json b/examples/32_sql_on_complete.json new file mode 100644 index 00000000..fb152af5 --- /dev/null +++ b/examples/32_sql_on_complete.json @@ -0,0 +1,11 @@ +{ + "name": "32_sql_on_complete", + "operations": [ + { + "sql": { + "up": "ALTER TABLE people ADD COLUMN birth_date timestamp", + "onComplete": true + } + } + ] +} diff --git a/pkg/jsonschema/testdata/sql-3.txtar b/pkg/jsonschema/testdata/sql-3.txtar new file mode 100644 index 00000000..c9b60eeb --- /dev/null +++ b/pkg/jsonschema/testdata/sql-3.txtar @@ -0,0 +1,18 @@ +This is a valid 'sql' migration. +It specifies `up`, and `on_complete` + +-- create_table.json -- +{ + "name": "migration_name", + "operations": [ + { + "sql": { + "up": "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)", + "onComplete": true + } + } + ] +} + +-- valid -- +true diff --git a/pkg/jsonschema/testdata/sql-4.txtar b/pkg/jsonschema/testdata/sql-4.txtar new file mode 100644 index 00000000..34a8732f --- /dev/null +++ b/pkg/jsonschema/testdata/sql-4.txtar @@ -0,0 +1,19 @@ +This is an invalid 'sql' migration. +It specifies `up`, `down` and `on_complete` + +-- create_table.json -- +{ + "name": "migration_name", + "operations": [ + { + "sql": { + "up": "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)", + "down": "DROP TABLE users", + "onComplete": true + } + } + ] +} + +-- valid -- +false diff --git a/pkg/migrations/migrations.go b/pkg/migrations/migrations.go index 2ea276e8..7182478c 100644 --- a/pkg/migrations/migrations.go +++ b/pkg/migrations/migrations.go @@ -35,11 +35,13 @@ type Operation interface { // IsolatedOperation is an operation that cannot be executed with other operations // in the same migration type IsolatedOperation interface { - IsIsolated() + // this operation is isolated when executed on start, cannot be executed with other operations + IsIsolated() bool } // RequiresSchemaRefreshOperation is an operation that requires the resulting schema to be refreshed type RequiresSchemaRefreshOperation interface { + // this operation requires the resulting schema to be refreshed when executed on start RequiresSchemaRefresh() } @@ -56,8 +58,8 @@ type ( // returns a descriptive error if the migration is invalid func (m *Migration) Validate(ctx context.Context, s *schema.Schema) error { for _, op := range m.Operations { - if _, ok := op.(IsolatedOperation); ok { - if len(m.Operations) > 1 { + if isolatedOp, ok := op.(IsolatedOperation); ok { + if isolatedOp.IsIsolated() && len(m.Operations) > 1 { return InvalidMigrationError{Reason: fmt.Sprintf("operation %q cannot be executed with other operations", OperationName(op))} } } diff --git a/pkg/migrations/migrations_test.go b/pkg/migrations/migrations_test.go index 80676770..a1461352 100644 --- a/pkg/migrations/migrations_test.go +++ b/pkg/migrations/migrations_test.go @@ -17,7 +17,7 @@ func TestMigrationsIsolated(t *testing.T) { &OpRawSQL{ Up: `foo`, }, - &OpRenameColumn{}, + &OpCreateTable{Name: "foo"}, }, } @@ -38,3 +38,18 @@ func TestMigrationsIsolatedValid(t *testing.T) { err := migration.Validate(context.TODO(), schema.New()) assert.NoError(t, err) } + +func TestOnCompleteSQLMigrationsAreNotIsolated(t *testing.T) { + migration := Migration{ + Name: "sql", + Operations: Operations{ + &OpRawSQL{ + Up: `foo`, + OnComplete: true, + }, + &OpCreateTable{Name: "foo"}, + }, + } + err := migration.Validate(context.TODO(), schema.New()) + assert.NoError(t, err) +} diff --git a/pkg/migrations/op_raw_sql.go b/pkg/migrations/op_raw_sql.go index 8d3a70b2..1719ed9b 100644 --- a/pkg/migrations/op_raw_sql.go +++ b/pkg/migrations/op_raw_sql.go @@ -12,14 +12,18 @@ import ( var _ Operation = (*OpRawSQL)(nil) func (o *OpRawSQL) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error { - _, err := conn.ExecContext(ctx, o.Up) - if err != nil { + if !o.OnComplete { + _, err := conn.ExecContext(ctx, o.Up) return err } return nil } func (o *OpRawSQL) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error { + if o.OnComplete { + _, err := conn.ExecContext(ctx, o.Up) + return err + } return nil } @@ -36,11 +40,15 @@ func (o *OpRawSQL) Validate(ctx context.Context, s *schema.Schema) error { return EmptyMigrationError{} } + if o.OnComplete && o.Down != "" { + return InvalidMigrationError{Reason: "down is not allowed with onComplete"} + } + return nil } -// this operation is isolated, cannot be executed with other operations -func (o *OpRawSQL) IsIsolated() {} +func (o *OpRawSQL) IsIsolated() bool { + return !o.OnComplete +} -// this operation requires the resulting schema to be refreshed func (o *OpRawSQL) RequiresSchemaRefresh() {} diff --git a/pkg/migrations/op_raw_sql_test.go b/pkg/migrations/op_raw_sql_test.go index 5931a58d..53fc8951 100644 --- a/pkg/migrations/op_raw_sql_test.go +++ b/pkg/migrations/op_raw_sql_test.go @@ -53,6 +53,81 @@ func TestRawSQL(t *testing.T) { }) }, }, + { + name: "raw SQL with onComplete", + migrations: []migrations.Migration{ + { + Name: "01_create_table", + Operations: migrations.Operations{ + &migrations.OpRawSQL{ + OnComplete: true, + Up: ` + CREATE TABLE test_table ( + id serial, + name text + ) + `, + }, + }, + }, + }, + afterStart: func(t *testing.T, db *sql.DB, schema string) { + // SQL didn't run yet + TableMustNotExist(t, db, schema, "test_table") + }, + afterComplete: func(t *testing.T, db *sql.DB, schema string) { + // table can be accessed after start + TableMustExist(t, db, schema, "test_table") + + // inserts work + MustInsert(t, db, schema, "01_create_table", "test_table", map[string]string{ + "name": "foo", + }) + }, + }, + { + name: "raw SQL after a migration with onComplete", + migrations: []migrations.Migration{ + { + Name: "01_create_table", + Operations: migrations.Operations{ + &migrations.OpCreateTable{ + Name: "test_table", + Columns: []migrations.Column{ + {Name: "id", Type: "serial"}, + {Name: "name", Type: "text"}, + }, + }, + &migrations.OpRawSQL{ + OnComplete: true, + Up: ` + ALTER TABLE test_table ADD COLUMN age int + `, + }, + }, + }, + }, + afterStart: func(t *testing.T, db *sql.DB, schema string) { + // SQL didn't run yet + ViewMustExist(t, db, schema, "01_create_table", "test_table") + ColumnMustNotExist(t, db, schema, "test_table", "age") + }, + afterRollback: func(t *testing.T, db *sql.DB, schema string) { + // table is dropped after rollback + TableMustNotExist(t, db, schema, "test_table") + }, + afterComplete: func(t *testing.T, db *sql.DB, schema string) { + // table can be accessed after start + TableMustExist(t, db, schema, "test_table") + ColumnMustExist(t, db, schema, "test_table", "age") + + // inserts work + MustInsert(t, db, schema, "01_create_table", "test_table", map[string]string{ + "name": "foo", + "age": "42", + }) + }, + }, { name: "migration on top of raw SQL", migrations: []migrations.Migration{ diff --git a/pkg/migrations/types.go b/pkg/migrations/types.go index aaaaaebc..cb583208 100644 --- a/pkg/migrations/types.go +++ b/pkg/migrations/types.go @@ -171,6 +171,9 @@ type OpRawSQL struct { // SQL expression for down migration Down string `json:"down,omitempty"` + // SQL expression will run on complete step (rather than on start) + OnComplete bool `json:"onComplete,omitempty"` + // SQL expression for up migration Up string `json:"up"` } diff --git a/pkg/roll/execute.go b/pkg/roll/execute.go index 95d30820..03374246 100644 --- a/pkg/roll/execute.go +++ b/pkg/roll/execute.go @@ -49,12 +49,15 @@ func (m *Roll) Start(ctx context.Context, migration *migrations.Migration, cbs . fmt.Errorf("unable to execute start operation: %w", err), errRollback) } - + // refresh schema when the op is isolated and requires a refresh (for example raw sql) + // we don't want to refresh the schema if the operation is not isolated as it would + // override changes made by other operations if _, ok := op.(migrations.RequiresSchemaRefreshOperation); ok { - // refresh schema - newSchema, err = m.state.ReadSchema(ctx, m.schema) - if err != nil { - return fmt.Errorf("unable to refresh schema: %w", err) + if isolatedOp, ok := op.(migrations.IsolatedOperation); ok && isolatedOp.IsIsolated() { + newSchema, err = m.state.ReadSchema(ctx, m.schema) + if err != nil { + return fmt.Errorf("unable to refresh schema: %w", err) + } } } } @@ -64,16 +67,21 @@ func (m *Roll) Start(ctx context.Context, migration *migrations.Migration, cbs . return nil } + // create views for the new version + return m.ensureViews(ctx, newSchema, migration.Name) +} + +func (m *Roll) ensureViews(ctx context.Context, schema *schema.Schema, version string) error { // create schema for the new version - versionSchema := VersionedSchemaName(m.schema, migration.Name) - _, err = m.pgConn.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", pq.QuoteIdentifier(versionSchema))) + versionSchema := VersionedSchemaName(m.schema, version) + _, err := m.pgConn.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", pq.QuoteIdentifier(versionSchema))) if err != nil { return err } // create views in the new schema - for name, table := range newSchema.Tables { - err = m.createView(ctx, migration.Name, name, table) + for name, table := range schema.Tables { + err = m.ensureView(ctx, version, name, table) if err != nil { return fmt.Errorf("unable to create view: %w", err) } @@ -112,11 +120,29 @@ func (m *Roll) Complete(ctx context.Context) error { } // execute operations + refreshViews := false for _, op := range migration.Operations { err := op.Complete(ctx, m.pgConn, schema) if err != nil { return fmt.Errorf("unable to execute complete operation: %w", err) } + + if _, ok := op.(migrations.RequiresSchemaRefreshOperation); ok { + refreshViews = true + } + } + + // recreate views for the new version (if some operations require it, ie SQL) + if refreshViews && !m.disableVersionSchemas { + schema, err = m.state.ReadSchema(ctx, m.schema) + if err != nil { + return fmt.Errorf("unable to read schema: %w", err) + } + + err = m.ensureViews(ctx, schema, migration.Name) + if err != nil { + return err + } } // mark as completed @@ -162,7 +188,7 @@ func (m *Roll) Rollback(ctx context.Context) error { } // create view creates a view for the new version of the schema -func (m *Roll) createView(ctx context.Context, version, name string, table schema.Table) error { +func (m *Roll) ensureView(ctx context.Context, version, name string, table schema.Table) error { columns := make([]string, 0, len(table.Columns)) for k, v := range table.Columns { columns = append(columns, fmt.Sprintf("%s AS %s", pq.QuoteIdentifier(v.Name), pq.QuoteIdentifier(k))) @@ -179,7 +205,9 @@ func (m *Roll) createView(ctx context.Context, version, name string, table schem } _, err := m.pgConn.ExecContext(ctx, - fmt.Sprintf("CREATE OR REPLACE VIEW %s.%s %s AS SELECT %s FROM %s", + fmt.Sprintf("BEGIN; DROP VIEW IF EXISTS %s.%s; CREATE VIEW %s.%s %s AS SELECT %s FROM %s; COMMIT", + pq.QuoteIdentifier(VersionedSchemaName(m.schema, version)), + pq.QuoteIdentifier(name), pq.QuoteIdentifier(VersionedSchemaName(m.schema, version)), pq.QuoteIdentifier(name), withOptions, diff --git a/schema.json b/schema.json index 5b532a3b..91f60bbd 100644 --- a/schema.json +++ b/schema.json @@ -312,9 +312,34 @@ "up": { "description": "SQL expression for up migration", "type": "string" + }, + "onComplete": { + "description": "SQL expression will run on complete step (rather than on start)", + "type": "boolean", + "default": false } }, "required": ["up"], + "oneOf": [ + { + "required": ["down"] + }, + { + "required": ["onComplete"] + }, + { + "not": { + "anyOf": [ + { + "required": ["down"] + }, + { + "required": ["onComplete"] + } + ] + } + } + ], "type": "object" }, "OpRenameTable": {