Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: block concurrent renames with declarative schema changes #128683

Merged
merged 1 commit into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/sql/alter_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
Expand Down Expand Up @@ -92,6 +93,12 @@ func (p *planner) AlterSchema(ctx context.Context, n *tree.AlterSchema) (planNod
}

func (n *alterSchemaNode) startExec(params runParams) error {
// Exit early with an error if the schema is undergoing a declarative schema
// change.
if catalog.HasConcurrentDeclarativeSchemaChange(n.desc) {
return scerrors.ConcurrentSchemaChangeError(n.desc)
}

switch t := n.n.Cmd.(type) {
case *tree.AlterSchemaRename:
newName := string(t.NewName)
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -73,6 +74,11 @@ func (p *planner) renameDatabase(
func (p *planner) writeNonDropDatabaseChange(
ctx context.Context, desc *dbdesc.Mutable, jobDesc string,
) error {
// Exit early with an error if the table is undergoing a declarative schema
// change.
if catalog.HasConcurrentDeclarativeSchemaChange(desc) {
return scerrors.ConcurrentSchemaChangeError(desc)
}
if err := p.createNonDropDatabaseChangeJob(ctx, desc.ID, jobDesc); err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
Expand Down Expand Up @@ -56,6 +58,11 @@ func (p *planner) writeSchemaDesc(ctx context.Context, desc *schemadesc.Mutable)
func (p *planner) writeSchemaDescChange(
ctx context.Context, desc *schemadesc.Mutable, jobDesc string,
) error {
// Exit early with an error if the schema is undergoing a declarative schema
// change.
if catalog.HasConcurrentDeclarativeSchemaChange(desc) {
return scerrors.ConcurrentSchemaChangeError(desc)
}
record, recordExists := p.extendedEvalCtx.jobs.uniqueToCreate[desc.ID]
if recordExists {
// Update it.
Expand Down
136 changes: 108 additions & 28 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,10 @@ func requireTableKeyCount(
}

// TestConcurrentSchemaChanges is an integration style tests where we issue many
// schema changes concurrently (renames, add/drop columns, and create/drop
// schema changes concurrently (drops, renames, add/drop columns, and create/drop
// indexes) for a period of time and assert that they all successfully finish
// eventually.
// eventually. This test will also intentionally toggle different schema changer
// modes.
func TestConcurrentSchemaChanges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -621,28 +622,60 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

tdb := sqlutils.MakeSQLRunner(sqlDB)
dbName, scName, tblName := "testdb", "testsc", "t"
tdb.Exec(t, fmt.Sprintf("CREATE DATABASE %v;", dbName))
tdb.Exec(t, fmt.Sprintf("CREATE SCHEMA %v.%v;", dbName, scName))
tdb.Exec(t, fmt.Sprintf("CREATE TABLE %v.%v.%v (col INT PRIMARY KEY);", dbName, scName, tblName))
tdb.Exec(t, fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName))
useLegacyOrDeclarative := func() error {
decl := rand.Intn(2) == 0
if !decl {
_, err := sqlDB.Exec("SET use_declarative_schema_changer='off';")
return err
}
_, err := sqlDB.Exec("SET use_declarative_schema_changer='on';")
return err
}

createSchema := func() error {
return testutils.SucceedsSoonError(func() error {
_, err := sqlDB.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName))
if err != nil {
return err
}
return nil
})
}
require.NoError(t, createSchema())

// repeatWorkWithInterval repeats `work` indefinitely every `workInterval` until
// `ctx` is cancelled.
repeatWorkWithInterval := func(
workerName string, workInterval time.Duration, work func() error,
workerName string, workInterval time.Duration, work func(sqlDB *gosql.DB) error,
) func(context.Context) error {
return func(workerCtx context.Context) error {
sqlDB := s.SQLConn(t)
sqlDB.SetMaxOpenConns(1)
for {
jitteredInterval := workInterval * time.Duration(0.8+0.4*rand.Float32())
select {
case <-workerCtx.Done():
t.Logf("%v is signaled to finish work", workerName)
return nil
case <-time.After(jitteredInterval):
if err := work(); err != nil {
if err := work(sqlDB); err != nil {
t.Logf("%v encounters error %v; signal to main routine and finish working", workerName, err.Error())
return err
}
Expand All @@ -652,7 +685,18 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}

// A goroutine that repeatedly renames database `testdb` randomly.
g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func() error {
g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
return err
}
drop := rand.Intn(2) == 0
if drop {
if _, err := sqlDB.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil {
return err
}
t.Logf("DROP DATABASE %v", dbName)
return createSchema()
}
newDBName := fmt.Sprintf("testdb_%v", rand.Intn(1000))
if newDBName == dbName {
return nil
Expand All @@ -666,38 +710,69 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that renames schema `testdb.testsc` randomly.
g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func() error {
g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
return err
}
drop := rand.Intn(2) == 0
newSCName := fmt.Sprintf("testsc_%v", rand.Intn(1000))
if scName == newSCName {
return nil
}
_, err := sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName))
var err error
if !drop {
_, err = sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName))
} else {
_, err = sqlDB.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName))
}
if err == nil {
scName = newSCName
t.Logf("RENAME SCHEMA TO %v", newSCName)
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase) {
if !drop {
scName = newSCName
t.Logf("RENAME SCHEMA TO %v", newSCName)
} else {
t.Logf("DROP SCHEMA TO %v", scName)
return createSchema()
}
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema) {
err = nil // mute those errors as they're expected
t.Logf("Parent database is renamed; skipping this schema renaming.")
}
return err
}))

// A goroutine that renames table `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func() error {
g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
return err
}
newTblName := fmt.Sprintf("t_%v", rand.Intn(1000))
_, err := sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName))
drop := rand.Intn(2) == 0
var err error
if !drop {
_, err = sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName))
} else {
_, err = sqlDB.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName))
}
if err == nil {
tblName = newTblName
t.Logf("RENAME TABLE TO %v", newTblName)
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName) {
if !drop {
tblName = newTblName
t.Logf("RENAME TABLE TO %v", newTblName)
} else {
t.Logf("DROP TABLE %v", newTblName)
return createSchema()
}
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedObject, pgcode.UndefinedTable) {
err = nil
t.Logf("Parent database or schema is renamed; skipping this table renaming.")
}
return err
}))

// A goroutine that adds columns to `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func() error {
g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
return err
}
dbName, scName, tblName := dbName, scName, tblName
newColName := fmt.Sprintf("col_%v", rand.Intn(1000))

Expand All @@ -714,7 +789,10 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that drops columns from `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func() error {
g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
return err
}
// Randomly pick a non-PK column to drop.
dbName, scName, tblName := dbName, scName, tblName
colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName)
Expand All @@ -727,15 +805,15 @@ func TestConcurrentSchemaChanges(t *testing.T) {
if err == nil {
t.Logf("DROP COLUMN %v FROM %v.%v.%v", colName, dbName, scName, tblName)
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema,
pgcode.InvalidSchemaName, pgcode.UndefinedTable) {
pgcode.InvalidSchemaName, pgcode.UndefinedTable, pgcode.UndefinedColumn, pgcode.ObjectNotInPrerequisiteState) {
err = nil
t.Logf("Parent database or schema or table is renamed; skipping this column removal.")
}
return err
}))

// A goroutine that creates secondary index on a randomly selected column.
g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func() error {
g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(sqlDB *gosql.DB) error {
newIndexName := fmt.Sprintf("idx_%v", rand.Intn(1000))

// Randomly pick a non-PK column to create an index on.
Expand All @@ -761,14 +839,16 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that drops a secondary index randomly.
g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func() error {
g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
return err
}
// Randomly pick a public, secondary index to drop.
dbName, scName, tblName := dbName, scName, tblName
indexName, err := getASecondaryIndex(sqlDB, dbName, scName, tblName)
if err != nil || indexName == "" {
return err
}

_, err = sqlDB.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName))
if err == nil {
t.Logf("DROP INDEX %v FROM %v.%v.%v", indexName, dbName, scName, tblName)
Expand Down Expand Up @@ -816,7 +896,7 @@ func getASecondaryIndex(sqlDB *gosql.DB, dbName, scName, tblName string) (string
colNameRow, err := sqlDB.Query(fmt.Sprintf(`
SELECT index_name
FROM [show indexes from %s.%s.%s]
WHERE index_name != 't_pkey'
WHERE index_name NOT LIKE '%%_pkey'
ORDER BY random();
`, dbName, scName, tblName))
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ func (p *planner) writeSchemaChange(
return errors.Errorf("no schema changes allowed on table %q as it is being dropped",
tableDesc.Name)
}
// Exit early with an error if the table is undergoing a declarative schema
// change.
if catalog.HasConcurrentDeclarativeSchemaChange(tableDesc) {
return scerrors.ConcurrentSchemaChangeError(tableDesc)
}
if !tableDesc.IsNew() {
if err := p.createOrUpdateSchemaChangeJob(ctx, tableDesc, jobDesc, mutationID); err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/type_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
plpgsql "github.com/cockroachdb/cockroach/pkg/sql/plpgsql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/regions"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sem/plpgsqltree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/plpgsqltree/utils"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -107,6 +108,11 @@ func findTransitioningMembers(desc *typedesc.Mutable) ([][]byte, bool) {
func (p *planner) writeTypeSchemaChange(
ctx context.Context, typeDesc *typedesc.Mutable, jobDesc string,
) error {
// Exit early with an error if the table is undergoing a declarative schema
// change.
if catalog.HasConcurrentDeclarativeSchemaChange(typeDesc) {
return scerrors.ConcurrentSchemaChangeError(typeDesc)
}
// Check if there is a cached specification for this type, otherwise create one.
record, recordExists := p.extendedEvalCtx.jobs.uniqueToCreate[typeDesc.ID]
transitioningMembers, beingDropped := findTransitioningMembers(typeDesc)
Expand Down
Loading