diff --git a/pkg/sql/alter_schema.go b/pkg/sql/alter_schema.go index ef9ffe34e918..6d09a1b51210 100644 --- a/pkg/sql/alter_schema.go +++ b/pkg/sql/alter_schema.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" @@ -92,6 +93,12 @@ func (p *planner) AlterSchema(ctx context.Context, n *tree.AlterSchema) (planNod } func (n *alterSchemaNode) startExec(params runParams) error { + // Exit early with an error if the schema is undergoing a declarative schema + // change. + if catalog.HasConcurrentDeclarativeSchemaChange(n.desc) { + return scerrors.ConcurrentSchemaChangeError(n.desc) + } + switch t := n.n.Cmd.(type) { case *tree.AlterSchemaRename: newName := string(t.NewName) diff --git a/pkg/sql/database.go b/pkg/sql/database.go index 6bf506ee155e..e83c66a549ab 100644 --- a/pkg/sql/database.go +++ b/pkg/sql/database.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors" "github.com/cockroachdb/errors" ) @@ -73,6 +74,11 @@ func (p *planner) renameDatabase( func (p *planner) writeNonDropDatabaseChange( ctx context.Context, desc *dbdesc.Mutable, jobDesc string, ) error { + // Exit early with an error if the table is undergoing a declarative schema + // change. + if catalog.HasConcurrentDeclarativeSchemaChange(desc) { + return scerrors.ConcurrentSchemaChangeError(desc) + } if err := p.createNonDropDatabaseChangeJob(ctx, desc.ID, jobDesc); err != nil { return err } diff --git a/pkg/sql/schema.go b/pkg/sql/schema.go index 3a82e4328553..87d670025a98 100644 --- a/pkg/sql/schema.go +++ b/pkg/sql/schema.go @@ -16,9 +16,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -56,6 +58,11 @@ func (p *planner) writeSchemaDesc(ctx context.Context, desc *schemadesc.Mutable) func (p *planner) writeSchemaDescChange( ctx context.Context, desc *schemadesc.Mutable, jobDesc string, ) error { + // Exit early with an error if the schema is undergoing a declarative schema + // change. + if catalog.HasConcurrentDeclarativeSchemaChange(desc) { + return scerrors.ConcurrentSchemaChangeError(desc) + } record, recordExists := p.extendedEvalCtx.jobs.uniqueToCreate[desc.ID] if recordExists { // Update it. diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index a6883a93c26d..92053d3691af 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -591,9 +591,10 @@ func requireTableKeyCount( } // TestConcurrentSchemaChanges is an integration style tests where we issue many -// schema changes concurrently (renames, add/drop columns, and create/drop +// schema changes concurrently (drops, renames, add/drop columns, and create/drop // indexes) for a period of time and assert that they all successfully finish -// eventually. +// eventually. This test will also intentionally toggle different schema changer +// modes. func TestConcurrentSchemaChanges(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -621,20 +622,52 @@ func TestConcurrentSchemaChanges(t *testing.T) { } s, sqlDB, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) - - tdb := sqlutils.MakeSQLRunner(sqlDB) dbName, scName, tblName := "testdb", "testsc", "t" - tdb.Exec(t, fmt.Sprintf("CREATE DATABASE %v;", dbName)) - tdb.Exec(t, fmt.Sprintf("CREATE SCHEMA %v.%v;", dbName, scName)) - tdb.Exec(t, fmt.Sprintf("CREATE TABLE %v.%v.%v (col INT PRIMARY KEY);", dbName, scName, tblName)) - tdb.Exec(t, fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName)) + useLegacyOrDeclarative := func() error { + decl := rand.Intn(2) == 0 + if !decl { + _, err := sqlDB.Exec("SET use_declarative_schema_changer='off';") + return err + } + _, err := sqlDB.Exec("SET use_declarative_schema_changer='on';") + return err + } + + createSchema := func() error { + return testutils.SucceedsSoonError(func() error { + _, err := sqlDB.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName)) + if err != nil { + return err + } + _, err = sqlDB.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName)) + if err != nil { + return err + } + _, err = sqlDB.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName)) + if err != nil { + return err + } + _, err = sqlDB.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName)) + if err != nil { + return err + } + _, err = sqlDB.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName)) + if err != nil { + return err + } + return nil + }) + } + require.NoError(t, createSchema()) // repeatWorkWithInterval repeats `work` indefinitely every `workInterval` until // `ctx` is cancelled. repeatWorkWithInterval := func( - workerName string, workInterval time.Duration, work func() error, + workerName string, workInterval time.Duration, work func(sqlDB *gosql.DB) error, ) func(context.Context) error { return func(workerCtx context.Context) error { + sqlDB := s.SQLConn(t) + sqlDB.SetMaxOpenConns(1) for { jitteredInterval := workInterval * time.Duration(0.8+0.4*rand.Float32()) select { @@ -642,7 +675,7 @@ func TestConcurrentSchemaChanges(t *testing.T) { t.Logf("%v is signaled to finish work", workerName) return nil case <-time.After(jitteredInterval): - if err := work(); err != nil { + if err := work(sqlDB); err != nil { t.Logf("%v encounters error %v; signal to main routine and finish working", workerName, err.Error()) return err } @@ -652,7 +685,18 @@ func TestConcurrentSchemaChanges(t *testing.T) { } // A goroutine that repeatedly renames database `testdb` randomly. - g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func() error { + g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(sqlDB *gosql.DB) error { + if err := useLegacyOrDeclarative(); err != nil { + return err + } + drop := rand.Intn(2) == 0 + if drop { + if _, err := sqlDB.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil { + return err + } + t.Logf("DROP DATABASE %v", dbName) + return createSchema() + } newDBName := fmt.Sprintf("testdb_%v", rand.Intn(1000)) if newDBName == dbName { return nil @@ -666,16 +710,30 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that renames schema `testdb.testsc` randomly. - g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func() error { + g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(sqlDB *gosql.DB) error { + if err := useLegacyOrDeclarative(); err != nil { + return err + } + drop := rand.Intn(2) == 0 newSCName := fmt.Sprintf("testsc_%v", rand.Intn(1000)) if scName == newSCName { return nil } - _, err := sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName)) + var err error + if !drop { + _, err = sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName)) + } else { + _, err = sqlDB.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName)) + } if err == nil { - scName = newSCName - t.Logf("RENAME SCHEMA TO %v", newSCName) - } else if isPQErrWithCode(err, pgcode.UndefinedDatabase) { + if !drop { + scName = newSCName + t.Logf("RENAME SCHEMA TO %v", newSCName) + } else { + t.Logf("DROP SCHEMA TO %v", scName) + return createSchema() + } + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema) { err = nil // mute those errors as they're expected t.Logf("Parent database is renamed; skipping this schema renaming.") } @@ -683,13 +741,27 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that renames table `testdb.testsc.t` randomly. - g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func() error { + g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(sqlDB *gosql.DB) error { + if err := useLegacyOrDeclarative(); err != nil { + return err + } newTblName := fmt.Sprintf("t_%v", rand.Intn(1000)) - _, err := sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName)) + drop := rand.Intn(2) == 0 + var err error + if !drop { + _, err = sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName)) + } else { + _, err = sqlDB.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName)) + } if err == nil { - tblName = newTblName - t.Logf("RENAME TABLE TO %v", newTblName) - } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName) { + if !drop { + tblName = newTblName + t.Logf("RENAME TABLE TO %v", newTblName) + } else { + t.Logf("DROP TABLE %v", newTblName) + return createSchema() + } + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedObject, pgcode.UndefinedTable) { err = nil t.Logf("Parent database or schema is renamed; skipping this table renaming.") } @@ -697,7 +769,10 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that adds columns to `testdb.testsc.t` randomly. - g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func() error { + g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(sqlDB *gosql.DB) error { + if err := useLegacyOrDeclarative(); err != nil { + return err + } dbName, scName, tblName := dbName, scName, tblName newColName := fmt.Sprintf("col_%v", rand.Intn(1000)) @@ -714,7 +789,10 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that drops columns from `testdb.testsc.t` randomly. - g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func() error { + g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(sqlDB *gosql.DB) error { + if err := useLegacyOrDeclarative(); err != nil { + return err + } // Randomly pick a non-PK column to drop. dbName, scName, tblName := dbName, scName, tblName colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName) @@ -727,7 +805,7 @@ func TestConcurrentSchemaChanges(t *testing.T) { if err == nil { t.Logf("DROP COLUMN %v FROM %v.%v.%v", colName, dbName, scName, tblName) } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, - pgcode.InvalidSchemaName, pgcode.UndefinedTable) { + pgcode.InvalidSchemaName, pgcode.UndefinedTable, pgcode.UndefinedColumn, pgcode.ObjectNotInPrerequisiteState) { err = nil t.Logf("Parent database or schema or table is renamed; skipping this column removal.") } @@ -735,7 +813,7 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that creates secondary index on a randomly selected column. - g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func() error { + g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(sqlDB *gosql.DB) error { newIndexName := fmt.Sprintf("idx_%v", rand.Intn(1000)) // Randomly pick a non-PK column to create an index on. @@ -761,14 +839,16 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that drops a secondary index randomly. - g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func() error { + g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(sqlDB *gosql.DB) error { + if err := useLegacyOrDeclarative(); err != nil { + return err + } // Randomly pick a public, secondary index to drop. dbName, scName, tblName := dbName, scName, tblName indexName, err := getASecondaryIndex(sqlDB, dbName, scName, tblName) if err != nil || indexName == "" { return err } - _, err = sqlDB.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName)) if err == nil { t.Logf("DROP INDEX %v FROM %v.%v.%v", indexName, dbName, scName, tblName) @@ -816,7 +896,7 @@ func getASecondaryIndex(sqlDB *gosql.DB, dbName, scName, tblName string) (string colNameRow, err := sqlDB.Query(fmt.Sprintf(` SELECT index_name FROM [show indexes from %s.%s.%s] -WHERE index_name != 't_pkey' +WHERE index_name NOT LIKE '%%_pkey' ORDER BY random(); `, dbName, scName, tblName)) if err != nil { diff --git a/pkg/sql/table.go b/pkg/sql/table.go index 4ada912955eb..f9b16e9b4ca7 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -275,6 +275,11 @@ func (p *planner) writeSchemaChange( return errors.Errorf("no schema changes allowed on table %q as it is being dropped", tableDesc.Name) } + // Exit early with an error if the table is undergoing a declarative schema + // change. + if catalog.HasConcurrentDeclarativeSchemaChange(tableDesc) { + return scerrors.ConcurrentSchemaChangeError(tableDesc) + } if !tableDesc.IsNew() { if err := p.createOrUpdateSchemaChangeJob(ctx, tableDesc, jobDesc, mutationID); err != nil { return err diff --git a/pkg/sql/type_change.go b/pkg/sql/type_change.go index a19f2e06bf91..0a73e7320519 100644 --- a/pkg/sql/type_change.go +++ b/pkg/sql/type_change.go @@ -38,6 +38,7 @@ import ( plpgsql "github.com/cockroachdb/cockroach/pkg/sql/plpgsql/parser" "github.com/cockroachdb/cockroach/pkg/sql/regions" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors" "github.com/cockroachdb/cockroach/pkg/sql/sem/plpgsqltree" "github.com/cockroachdb/cockroach/pkg/sql/sem/plpgsqltree/utils" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -107,6 +108,11 @@ func findTransitioningMembers(desc *typedesc.Mutable) ([][]byte, bool) { func (p *planner) writeTypeSchemaChange( ctx context.Context, typeDesc *typedesc.Mutable, jobDesc string, ) error { + // Exit early with an error if the table is undergoing a declarative schema + // change. + if catalog.HasConcurrentDeclarativeSchemaChange(typeDesc) { + return scerrors.ConcurrentSchemaChangeError(typeDesc) + } // Check if there is a cached specification for this type, otherwise create one. record, recordExists := p.extendedEvalCtx.jobs.uniqueToCreate[typeDesc.ID] transitioningMembers, beingDropped := findTransitioningMembers(typeDesc)