diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8d5a07bce..0b83bd71d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -5,7 +5,7 @@ on: [pull_request] jobs: build: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index caa1f5714..07fa01d89 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -19,3 +19,5 @@ jobs: - uses: actions/checkout@v3 - name: golangci-lint uses: golangci/golangci-lint-action@v3 + with: + version: v1.46.2 diff --git a/.github/workflows/replica-tests.yml b/.github/workflows/replica-tests.yml index e28c2bc3a..f2a52ece8 100644 --- a/.github/workflows/replica-tests.yml +++ b/.github/workflows/replica-tests.yml @@ -5,10 +5,10 @@ on: [pull_request] jobs: build: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 strategy: matrix: - version: [mysql-5.7.25,mysql-8.0.16] + version: [mysql-5.7.25,mysql-8.0.16,PerconaServer-8.0.21] steps: - uses: actions/checkout@v2 diff --git a/.golangci.yml b/.golangci.yml index 4e0bc4fa5..e4ee4ab5d 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,13 +1,30 @@ run: timeout: 5m - modules-download-mode: readonly - linters: disable: - errcheck enable: + - bodyclose + - containedctx + - contextcheck + - dogsled + - durationcheck + - errname + - errorlint + - execinquery + - gofmt + - ifshort + - misspell + - nilerr + - nilnil - noctx + - nolintlint + - nosprintfhostport + - prealloc - rowserrcheck - sqlclosecheck + - unconvert + - unparam - unused - + - wastedassign + - whitespace diff --git a/build.sh b/build.sh index 068bdfb2e..b75891553 100755 --- a/build.sh +++ b/build.sh @@ -35,7 +35,8 @@ function build { (cd $buildpath && tar cfz ./gh-ost-binary-${osshort}-${GOARCH}-${timestamp}.tar.gz $target) - if [ "$GOOS" == "linux" ] ; then + # build RPM and deb for Linux, x86-64 only + if [ "$GOOS" == "linux" ] && [ "$GOARCH" == "amd64" ] ; then echo "Creating Distro full packages" builddir=$(setuptree) cp $buildpath/$target $builddir/gh-ost/usr/bin diff --git a/doc/coding-ghost.md b/doc/coding-ghost.md index ee26f0c60..24425e35f 100644 --- a/doc/coding-ghost.md +++ b/doc/coding-ghost.md @@ -5,7 +5,7 @@ Getting started with gh-ost development is simple! - First obtain the repository with `git clone` or `go get`. -- From inside of the repository run `script/cibuild` +- From inside of the repository run `script/cibuild`. - This will bootstrap the environment if needed, format the code, build the code, and then run the unit test. ## CI build workflow @@ -14,6 +14,12 @@ Getting started with gh-ost development is simple! If additional steps are needed, please add them into this workflow so that the workflow remains simple. +## `golang-ci` linter + +To enfore best-practices, Pull Requests are automatically linted by [`golang-ci`](https://golangci-lint.run/). The linter config is located at [`.golangci.yml`](https://github.com/github/gh-ost/blob/master/.golangci.yml) and the `golangci-lint` GitHub Action is located at [`.github/workflows/golangci-lint.yml`](https://github.com/github/gh-ost/blob/master/.github/workflows/golangci-lint.yml). + +To run the `golang-ci` linters locally _(recommended before push)_, use `script/lint`. + ## Notes: Currently, `script/ensure-go-installed` will install `go` for Mac OS X and Linux. We welcome PR's to add other platforms. diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 417255a41..021462fa2 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -6,6 +6,10 @@ A more in-depth discussion of various `gh-ost` command line flags: implementatio Add this flag when executing on Aliyun RDS. +### allow-zero-in-date + +Allows the user to make schema changes that include a zero date or zero in date (e.g. adding a `datetime default '0000-00-00 00:00:00'` column), even if global `sql_mode` on MySQL has `NO_ZERO_IN_DATE,NO_ZERO_DATE`. + ### azure Add this flag when executing on Azure Database for MySQL. @@ -41,6 +45,22 @@ If you happen to _know_ your servers use RBR (Row Based Replication, i.e. `binlo Skipping this step means `gh-ost` would not need the `SUPER` privilege in order to operate. You may want to use this on Amazon RDS. +### attempt-instant-ddl + +MySQL 8.0 supports "instant DDL" for some operations. If an alter statement can be completed with instant DDL, only a metadata change is required internally. Instant operations include: + +- Adding a column +- Dropping a column +- Dropping an index +- Extending a varchar column +- Adding a virtual generated column + +It is not reliable to parse the `ALTER` statement to determine if it is instant or not. This is because the table might be in an older row format, or have some other incompatibility that is difficult to identify. + +`--attempt-instant-ddl` is disabled by default, but the risks of enabling it are relatively minor: `gh-ost` may need to acquire a metadata lock at the start of the operation. This is not a problem for most scenarios, but it could be a problem for users that start the DDL during a period with long running transactions. + +`gh-ost` will automatically fallback to the normal DDL process if the attempt to use instant DDL is unsuccessful. + ### conf `--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format: @@ -226,6 +246,18 @@ Allows `gh-ost` to connect to the MySQL servers using encrypted connections, but `--ssl-key=/path/to/ssl-key.key`: SSL private key file (in PEM format). +### storage-engine +Default is `innodb`, and `rocksdb` support is currently experimental. InnoDB and RocksDB are both transactional engines, supporting both shared and exclusive row locks. + +But RocksDB currently lacks a few features support compared to InnoDB: +- Gap Locks +- Foreign Key +- Generated Columns +- Spatial +- Geometry + +When `--storage-engine=rocksdb`, `gh-ost` will make some changes necessary (e.g. sets isolation level to `READ_COMMITTED`) to support RocksDB. + ### test-on-replica Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [`testing-on-replica`](testing-on-replica.md) @@ -242,6 +274,14 @@ Provide a command delimited list of replicas; `gh-ost` will throttle when any of Provide an HTTP endpoint; `gh-ost` will issue `HEAD` requests on given URL and throttle whenever response status code is not `200`. The URL can be queried and updated dynamically via [interactive commands](interactive-commands.md). Empty URL disables the HTTP check. +### throttle-http-interval-millis + +Defaults to 100. Configures the HTTP throttle check interval in milliseconds. + +### throttle-http-timeout-millis + +Defaults to 1000 (1 second). Configures the HTTP throttler check timeout in milliseconds. + ### timestamp-old-table Makes the _old_ table include a timestamp value. The _old_ table is what the original table is renamed to at the end of a successful migration. For example, if the table is `gh_ost_test`, then the _old_ table would normally be `_gh_ost_test_del`. With `--timestamp-old-table` it would be, for example, `_gh_ost_test_20170221103147_del`. diff --git a/doc/requirements-and-limitations.md b/doc/requirements-and-limitations.md index 0521028b9..88642dce2 100644 --- a/doc/requirements-and-limitations.md +++ b/doc/requirements-and-limitations.md @@ -20,6 +20,8 @@ The `SUPER` privilege is required for `STOP SLAVE`, `START SLAVE` operations. Th - Switching your `binlog_format` to `ROW`, in the case where it is _not_ `ROW` and you explicitly specified `--switch-to-rbr` - If your replication is already in RBR (`binlog_format=ROW`) you can specify `--assume-rbr` to avoid the `STOP SLAVE/START SLAVE` operations, hence no need for `SUPER`. +- `gh-ost` uses the `REPEATABLE_READ` transaction isolation level for all MySQL connections, regardless of the server default. + - Running `--test-on-replica`: before the cut-over phase, `gh-ost` stops replication so that you can compare the two tables and satisfy that the migration is sound. ### Limitations diff --git a/doc/shared-key.md b/doc/shared-key.md index c7f24cc5b..3dfa39b7c 100644 --- a/doc/shared-key.md +++ b/doc/shared-key.md @@ -29,7 +29,7 @@ CREATE TABLE tbl ( (This is also the definition of the _ghost_ table, except that that table would be called `_tbl_gho`). -In this migration, the _before_ and _after_ versions contain the same unique not-null key (the PRIMARY KEY). To run this migration, `gh-ost` would iterate through the `tbl` table using the primary key, copy rows from `tbl` to the _ghost_ table `_tbl_gho` in primary key order, while also applying the binlog event writes from `tble` onto `_tbl_gho`. +In this migration, the _before_ and _after_ versions contain the same unique not-null key (the PRIMARY KEY). To run this migration, `gh-ost` would iterate through the `tbl` table using the primary key, copy rows from `tbl` to the _ghost_ table `_tbl_gho` in primary key order, while also applying the binlog event writes from `tbl` onto `_tbl_gho`. The applying of the binlog events is what requires the shared unique key. For example, an `UPDATE` statement to `tbl` translates to a `REPLACE` statement which `gh-ost` applies to `_tbl_gho`. A `REPLACE` statement expects to insert or replace an existing row based on its row's values and the table's unique key constraints. In particular, if inserting that row would result in a unique key violation (e.g., a row with that primary key already exists), it would _replace_ that existing row with the new values. diff --git a/go/base/context.go b/go/base/context.go index 66712a2b3..b7e0ad458 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -92,6 +92,7 @@ type MigrationContext struct { AssumeRBR bool SkipForeignKeyChecks bool SkipStrictMode bool + AllowZeroInDate bool NullableUniqueKeyAllowed bool ApproveRenamedColumns bool SkipRenamedColumns bool @@ -100,6 +101,7 @@ type MigrationContext struct { AliyunRDS bool GoogleCloudPlatform bool AzureMySQL bool + AttemptInstantDDL bool config ContextConfig configMutex *sync.Mutex @@ -289,6 +291,19 @@ func NewMigrationContext() *MigrationContext { } } +func (this *MigrationContext) SetConnectionConfig(storageEngine string) error { + var transactionIsolation string + switch storageEngine { + case "rocksdb": + transactionIsolation = "READ-COMMITTED" + default: + transactionIsolation = "REPEATABLE-READ" + } + this.InspectorConnectionConfig.TransactionIsolation = transactionIsolation + this.ApplierConnectionConfig.TransactionIsolation = transactionIsolation + return nil +} + func getSafeTableName(baseName string, suffix string) string { name := fmt.Sprintf("~%s_%s", baseName, suffix) if len(name) <= mysql.MaxTableNameLength { @@ -438,6 +453,10 @@ func (this *MigrationContext) IsTransactionalTable() bool { { return true } + case "rocksdb": + { + return true + } } return false } @@ -869,7 +888,7 @@ func (this *MigrationContext) ReadConfigFile() error { if cfg.Section("osc").HasKey("chunk_size") { this.config.Osc.Chunk_Size, err = cfg.Section("osc").Key("chunk_size").Int64() if err != nil { - return fmt.Errorf("Unable to read osc chunk size: %s", err.Error()) + return fmt.Errorf("Unable to read osc chunk size: %w", err) } } @@ -884,7 +903,7 @@ func (this *MigrationContext) ReadConfigFile() error { if cfg.Section("osc").HasKey("max_lag_millis") { this.config.Osc.Max_Lag_Millis, err = cfg.Section("osc").Key("max_lag_millis").Int64() if err != nil { - return fmt.Errorf("Unable to read max lag millis: %s", err.Error()) + return fmt.Errorf("Unable to read max lag millis: %w", err) } } diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 72771c80e..8aa567e0c 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -67,6 +67,9 @@ func main() { flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)") flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)") flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)") + flag.BoolVar(&migrationContext.AttemptInstantDDL, "attempt-instant-ddl", false, "Attempt to use instant DDL for this migration first") + storageEngine := flag.String("storage-engine", "innodb", "Specify table storage engine (default: 'innodb'). When 'rocksdb': the session transaction isolation level is changed from REPEATABLE_READ to READ_COMMITTED.") + flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy") flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") @@ -78,6 +81,7 @@ func main() { flag.BoolVar(&migrationContext.DiscardForeignKeys, "discard-foreign-keys", false, "DANGER! This flag will migrate a table that has foreign keys and will NOT create foreign keys on the ghost table, thus your altered table will have NO foreign keys. This is useful for intentional dropping of foreign keys") flag.BoolVar(&migrationContext.SkipForeignKeyChecks, "skip-foreign-key-checks", false, "set to 'true' when you know for certain there are no foreign keys on your table, and wish to skip the time it takes for gh-ost to verify that") flag.BoolVar(&migrationContext.SkipStrictMode, "skip-strict-mode", false, "explicitly tell gh-ost binlog applier not to enforce strict sql mode") + flag.BoolVar(&migrationContext.AllowZeroInDate, "allow-zero-in-date", false, "explicitly tell gh-ost binlog applier to ignore NO_ZERO_IN_DATE,NO_ZERO_DATE in sql_mode") flag.BoolVar(&migrationContext.AliyunRDS, "aliyun-rds", false, "set to 'true' when you execute on Aliyun RDS.") flag.BoolVar(&migrationContext.GoogleCloudPlatform, "gcp", false, "set to 'true' when you execute on a 1st generation Google Cloud Platform (GCP).") flag.BoolVar(&migrationContext.AzureMySQL, "azure", false, "set to 'true' when you execute on Azure Database on MySQL.") @@ -180,6 +184,10 @@ func main() { migrationContext.Log.SetLevel(zap.ErrorLevel) } + if err := migrationContext.SetConnectionConfig(*storageEngine); err != nil { + migrationContext.Log.Fatale(err) + } + if migrationContext.AlterStatement == "" { migrationContext.Log.Fatalf("--alter must be provided and statement must not be empty") } @@ -207,43 +215,46 @@ func main() { } migrationContext.Noop = !(*executeFlag) if migrationContext.AllowedRunningOnMaster && migrationContext.TestOnReplica { - migrationContext.Log.Fatalf("--allow-on-master and --test-on-replica are mutually exclusive") + migrationContext.Log.Fatal("--allow-on-master and --test-on-replica are mutually exclusive") } if migrationContext.AllowedRunningOnMaster && migrationContext.MigrateOnReplica { - migrationContext.Log.Fatalf("--allow-on-master and --migrate-on-replica are mutually exclusive") + migrationContext.Log.Fatal("--allow-on-master and --migrate-on-replica are mutually exclusive") } if migrationContext.MigrateOnReplica && migrationContext.TestOnReplica { - migrationContext.Log.Fatalf("--migrate-on-replica and --test-on-replica are mutually exclusive") + migrationContext.Log.Fatal("--migrate-on-replica and --test-on-replica are mutually exclusive") } if migrationContext.SwitchToRowBinlogFormat && migrationContext.AssumeRBR { - migrationContext.Log.Fatalf("--switch-to-rbr and --assume-rbr are mutually exclusive") + migrationContext.Log.Fatal("--switch-to-rbr and --assume-rbr are mutually exclusive") } if migrationContext.TestOnReplicaSkipReplicaStop { if !migrationContext.TestOnReplica { - migrationContext.Log.Fatalf("--test-on-replica-skip-replica-stop requires --test-on-replica to be enabled") + migrationContext.Log.Fatal("--test-on-replica-skip-replica-stop requires --test-on-replica to be enabled") } migrationContext.Log.Warning("--test-on-replica-skip-replica-stop enabled. We will not stop replication before cut-over. Ensure you have a plugin that does this.") } if migrationContext.CliMasterUser != "" && migrationContext.AssumeMasterHostname == "" { - migrationContext.Log.Fatalf("--master-user requires --assume-master-host") + migrationContext.Log.Fatal("--master-user requires --assume-master-host") } if migrationContext.CliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" { - migrationContext.Log.Fatalf("--master-password requires --assume-master-host") + migrationContext.Log.Fatal("--master-password requires --assume-master-host") } if migrationContext.TLSCACertificate != "" && !migrationContext.UseTLS { - migrationContext.Log.Fatalf("--ssl-ca requires --ssl") + migrationContext.Log.Fatal("--ssl-ca requires --ssl") } if migrationContext.TLSCertificate != "" && !migrationContext.UseTLS { - migrationContext.Log.Fatalf("--ssl-cert requires --ssl") + migrationContext.Log.Fatal("--ssl-cert requires --ssl") } if migrationContext.TLSKey != "" && !migrationContext.UseTLS { - migrationContext.Log.Fatalf("--ssl-key requires --ssl") + migrationContext.Log.Fatal("--ssl-key requires --ssl") } if migrationContext.TLSAllowInsecure && !migrationContext.UseTLS { - migrationContext.Log.Fatalf("--ssl-allow-insecure requires --ssl") + migrationContext.Log.Fatal("--ssl-allow-insecure requires --ssl") } if *replicationLagQuery != "" { - migrationContext.Log.Warningf("--replication-lag-query is deprecated") + migrationContext.Log.Warning("--replication-lag-query is deprecated") + } + if *storageEngine == "rocksdb" { + migrationContext.Log.Warning("RocksDB storage engine support is experimental") } switch *cutOver { @@ -271,7 +282,7 @@ func main() { } if *askPass { fmt.Println("Password:") - bytePassword, err := term.ReadPassword(int(syscall.Stdin)) + bytePassword, err := term.ReadPassword(syscall.Stdin) if err != nil { migrationContext.Log.Fatale(err) } @@ -301,10 +312,9 @@ func main() { acceptSignals(migrationContext) migrator := logic.NewMigrator(migrationContext, AppVersion) - err := migrator.Migrate() - if err != nil { + if err := migrator.Migrate(); err != nil { migrator.ExecOnFailureHook() migrationContext.Log.Fatale(err) } - fmt.Fprintf(os.Stdout, "# Done\n") + fmt.Fprintln(os.Stdout, "# Done") } diff --git a/go/logic/applier.go b/go/logic/applier.go index 30b04be81..0a5da67aa 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -8,6 +8,7 @@ package logic import ( gosql "database/sql" "fmt" + "strings" "sync/atomic" "time" @@ -47,7 +48,7 @@ func newDmlBuildResultError(err error) *dmlBuildResult { } } -// Applier connects and writes the the applier-server, which is the server where migration +// Applier connects and writes the applier-server, which is the server where migration // happens. This is typically the master, but could be a replica when `--test-on-replica` or // `--execute-on-replica` are given. // Applier is the one to actually write row data and apply binlog events onto the ghost table. @@ -116,6 +117,33 @@ func (this *Applier) validateAndReadTimeZone() error { return nil } +// generateSqlModeQuery return a `sql_mode = ...` query, to be wrapped with a `set session` or `set global`, +// based on gh-ost configuration: +// - User may skip strict mode +// - User may allow zero dats or zero in dates +func (this *Applier) generateSqlModeQuery() string { + sqlModeAddendum := []string{`NO_AUTO_VALUE_ON_ZERO`} + if !this.migrationContext.SkipStrictMode { + sqlModeAddendum = append(sqlModeAddendum, `STRICT_ALL_TABLES`) + } + sqlModeQuery := fmt.Sprintf("CONCAT(@@session.sql_mode, ',%s')", strings.Join(sqlModeAddendum, ",")) + if this.migrationContext.AllowZeroInDate { + sqlModeQuery = fmt.Sprintf("REPLACE(REPLACE(%s, 'NO_ZERO_IN_DATE', ''), 'NO_ZERO_DATE', '')", sqlModeQuery) + } + + return fmt.Sprintf("sql_mode = %s", sqlModeQuery) +} + +// generateInstantDDLQuery returns the SQL for this ALTER operation +// with an INSTANT assertion (requires MySQL 8.0+) +func (this *Applier) generateInstantDDLQuery() string { + return fmt.Sprintf(`ALTER /* gh-ost */ TABLE %s.%s %s, ALGORITHM=INSTANT`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + this.migrationContext.AlterStatementOptions, + ) +} + // readTableColumns reads table columns on applier func (this *Applier) readTableColumns() (err error) { this.migrationContext.Log.Infof("Examining table structure on applier") @@ -169,6 +197,27 @@ func (this *Applier) ValidateOrDropExistingTables() error { return nil } +// AttemptInstantDDL attempts to use instant DDL (from MySQL 8.0, and earlier in Aurora and some others). +// If successful, the operation is only a meta-data change so a lot of time is saved! +// The risk of attempting to instant DDL when not supported is that a metadata lock may be acquired. +// This is minor, since gh-ost will eventually require a metadata lock anyway, but at the cut-over stage. +// Instant operations include: +// - Adding a column +// - Dropping a column +// - Dropping an index +// - Extending a VARCHAR column +// - Adding a virtual generated column +// It is not reliable to parse the `alter` statement to determine if it is instant or not. +// This is because the table might be in an older row format, or have some other incompatibility +// that is difficult to identify. +func (this *Applier) AttemptInstantDDL() error { + query := this.generateInstantDDLQuery() + this.migrationContext.Log.Infof("INSTANT DDL query is: %s", query) + // We don't need a trx, because for instant DDL the SQL mode doesn't matter. + _, err := this.db.Exec(query) + return err +} + // CreateGhostTable creates the ghost table on the applier host func (this *Applier) CreateGhostTable() error { query := fmt.Sprintf(`create /* gh-ost */ table %s.%s like %s.%s`, @@ -181,11 +230,33 @@ func (this *Applier) CreateGhostTable() error { sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetGhostTableName()), ) - if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { - return err - } - this.migrationContext.Log.Infof("Ghost table created") - return nil + + err := func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone) + sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery()) + + if _, err := tx.Exec(sessionQuery); err != nil { + return err + } + if _, err := tx.Exec(query); err != nil { + return err + } + this.migrationContext.Log.Infof("Ghost table created") + if err := tx.Commit(); err != nil { + // Neither SET SESSION nor ALTER are really transactional, so strictly speaking + // there's no need to commit; but let's do this the legit way anyway. + return err + } + return nil + }() + + return err } // AlterGhost applies `alter` statement on ghost table @@ -200,11 +271,33 @@ func (this *Applier) AlterGhost() error { sql.EscapeName(this.migrationContext.GetGhostTableName()), ) this.migrationContext.Log.Debugf("ALTER statement: %s", query) - if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { - return err - } - this.migrationContext.Log.Infof("Ghost table altered") - return nil + + err := func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone) + sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery()) + + if _, err := tx.Exec(sessionQuery); err != nil { + return err + } + if _, err := tx.Exec(query); err != nil { + return err + } + this.migrationContext.Log.Infof("Ghost table altered") + if err := tx.Commit(); err != nil { + // Neither SET SESSION nor ALTER are really transactional, so strictly speaking + // there's no need to commit; but let's do this the legit way anyway. + return err + } + return nil + }() + + return err } // AlterGhost applies `alter` statement on ghost table @@ -375,14 +468,15 @@ func (this *Applier) ExecuteThrottleQuery() (int64, error) { return result, nil } -// ReadMigrationMinValues returns the minimum values to be iterated on rowcopy -func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error { +// readMigrationMinValues returns the minimum values to be iterated on rowcopy +func (this *Applier) readMigrationMinValues(tx *gosql.Tx, uniqueKey *sql.UniqueKey) error { this.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &uniqueKey.Columns) if err != nil { return err } - rows, err := this.db.Query(query) + + rows, err := tx.Query(query) if err != nil { return err } @@ -399,14 +493,15 @@ func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error { return err } -// ReadMigrationMaxValues returns the maximum values to be iterated on rowcopy -func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error { +// readMigrationMaxValues returns the maximum values to be iterated on rowcopy +func (this *Applier) readMigrationMaxValues(tx *gosql.Tx, uniqueKey *sql.UniqueKey) error { this.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &uniqueKey.Columns) if err != nil { return err } - rows, err := this.db.Query(query) + + rows, err := tx.Query(query) if err != nil { return err } @@ -445,13 +540,20 @@ func (this *Applier) ReadMigrationRangeValues() error { return err } - if err := this.ReadMigrationMinValues(this.migrationContext.UniqueKey); err != nil { + tx, err := this.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + if err := this.readMigrationMinValues(tx, this.migrationContext.UniqueKey); err != nil { return err } - if err := this.ReadMigrationMaxValues(this.migrationContext.UniqueKey); err != nil { + if err := this.readMigrationMaxValues(tx, this.migrationContext.UniqueKey); err != nil { return err } - return nil + + return tx.Commit() } // CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, @@ -534,12 +636,9 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected return nil, err } defer tx.Rollback() + sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone) - sqlModeAddendum := `,NO_AUTO_VALUE_ON_ZERO` - if !this.migrationContext.SkipStrictMode { - sqlModeAddendum = fmt.Sprintf("%s,STRICT_ALL_TABLES", sqlModeAddendum) - } - sessionQuery = fmt.Sprintf("%s, sql_mode = CONCAT(@@session.sql_mode, ',%s')", sessionQuery, sqlModeAddendum) + sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery()) if _, err := tx.Exec(sessionQuery); err != nil { return nil, err @@ -1034,7 +1133,6 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { - var totalDelta int64 err := func() error { @@ -1049,12 +1147,7 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) } sessionQuery := "SET SESSION time_zone = '+00:00'" - - sqlModeAddendum := `,NO_AUTO_VALUE_ON_ZERO` - if !this.migrationContext.SkipStrictMode { - sqlModeAddendum = fmt.Sprintf("%s,STRICT_ALL_TABLES", sqlModeAddendum) - } - sessionQuery = fmt.Sprintf("%s, sql_mode = CONCAT(@@session.sql_mode, ',%s')", sessionQuery, sqlModeAddendum) + sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery()) if _, err := tx.Exec(sessionQuery); err != nil { return rollback(err) @@ -1066,7 +1159,7 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) } result, err := tx.Exec(buildResult.query, buildResult.args...) if err != nil { - err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args) + err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args) return rollback(err) } diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go new file mode 100644 index 000000000..a3563511f --- /dev/null +++ b/go/logic/applier_test.go @@ -0,0 +1,185 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "strings" + "testing" + + test "github.com/openark/golib/tests" + + "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/sql" +) + +func TestApplierGenerateSqlModeQuery(t *testing.T) { + migrationContext := base.NewMigrationContext() + applier := NewApplier(migrationContext) + + { + test.S(t).ExpectEquals( + applier.generateSqlModeQuery(), + `sql_mode = CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO,STRICT_ALL_TABLES')`, + ) + } + { + migrationContext.SkipStrictMode = true + migrationContext.AllowZeroInDate = false + test.S(t).ExpectEquals( + applier.generateSqlModeQuery(), + `sql_mode = CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO')`, + ) + } + { + migrationContext.SkipStrictMode = false + migrationContext.AllowZeroInDate = true + test.S(t).ExpectEquals( + applier.generateSqlModeQuery(), + `sql_mode = REPLACE(REPLACE(CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO,STRICT_ALL_TABLES'), 'NO_ZERO_IN_DATE', ''), 'NO_ZERO_DATE', '')`, + ) + } + { + migrationContext.SkipStrictMode = true + migrationContext.AllowZeroInDate = true + test.S(t).ExpectEquals( + applier.generateSqlModeQuery(), + `sql_mode = REPLACE(REPLACE(CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO'), 'NO_ZERO_IN_DATE', ''), 'NO_ZERO_DATE', '')`, + ) + } +} + +func TestApplierUpdateModifiesUniqueKeyColumns(t *testing.T) { + columns := sql.NewColumnList([]string{"id", "item_id"}) + columnValues := sql.ToColumnValues([]interface{}{123456, 42}) + + migrationContext := base.NewMigrationContext() + migrationContext.OriginalTableColumns = columns + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: t.Name(), + Columns: *columns, + } + + applier := NewApplier(migrationContext) + + t.Run("unmodified", func(t *testing.T) { + modifiedColumn, isModified := applier.updateModifiesUniqueKeyColumns(&binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.UpdateDML, + NewColumnValues: columnValues, + WhereColumnValues: columnValues, + }) + test.S(t).ExpectEquals(modifiedColumn, "") + test.S(t).ExpectFalse(isModified) + }) + + t.Run("modified", func(t *testing.T) { + modifiedColumn, isModified := applier.updateModifiesUniqueKeyColumns(&binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.UpdateDML, + NewColumnValues: sql.ToColumnValues([]interface{}{123456, 24}), + WhereColumnValues: columnValues, + }) + test.S(t).ExpectEquals(modifiedColumn, "item_id") + test.S(t).ExpectTrue(isModified) + }) +} + +func TestApplierBuildDMLEventQuery(t *testing.T) { + columns := sql.NewColumnList([]string{"id", "item_id"}) + columnValues := sql.ToColumnValues([]interface{}{123456, 42}) + + migrationContext := base.NewMigrationContext() + migrationContext.OriginalTableName = "test" + migrationContext.OriginalTableColumns = columns + migrationContext.SharedColumns = columns + migrationContext.MappedSharedColumns = columns + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: t.Name(), + Columns: *columns, + } + + applier := NewApplier(migrationContext) + + t.Run("delete", func(t *testing.T) { + binlogEvent := &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.DeleteDML, + WhereColumnValues: columnValues, + } + + res := applier.buildDMLEventQuery(binlogEvent) + test.S(t).ExpectEquals(len(res), 1) + test.S(t).ExpectNil(res[0].err) + test.S(t).ExpectEquals(strings.TrimSpace(res[0].query), + `delete /* gh-ost `+"`test`.`_test_gho`"+` */ + from + `+"`test`.`_test_gho`"+` + where + ((`+"`id`"+` = ?) and (`+"`item_id`"+` = ?))`) + + test.S(t).ExpectEquals(len(res[0].args), 2) + test.S(t).ExpectEquals(res[0].args[0], 123456) + test.S(t).ExpectEquals(res[0].args[1], 42) + }) + + t.Run("insert", func(t *testing.T) { + binlogEvent := &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues, + } + res := applier.buildDMLEventQuery(binlogEvent) + test.S(t).ExpectEquals(len(res), 1) + test.S(t).ExpectNil(res[0].err) + test.S(t).ExpectEquals(strings.TrimSpace(res[0].query), + `replace /* gh-ost `+"`test`.`_test_gho`"+` */ into + `+"`test`.`_test_gho`"+` + `+"(`id`, `item_id`)"+` + values + (?, ?)`) + test.S(t).ExpectEquals(len(res[0].args), 2) + test.S(t).ExpectEquals(res[0].args[0], 123456) + test.S(t).ExpectEquals(res[0].args[1], 42) + }) + + t.Run("update", func(t *testing.T) { + binlogEvent := &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.UpdateDML, + NewColumnValues: columnValues, + WhereColumnValues: columnValues, + } + res := applier.buildDMLEventQuery(binlogEvent) + test.S(t).ExpectEquals(len(res), 1) + test.S(t).ExpectNil(res[0].err) + test.S(t).ExpectEquals(strings.TrimSpace(res[0].query), + `update /* gh-ost `+"`test`.`_test_gho`"+` */ + `+"`test`.`_test_gho`"+` + set + `+"`id`"+`=?, `+"`item_id`"+`=? + where + ((`+"`id`"+` = ?) and (`+"`item_id`"+` = ?))`) + test.S(t).ExpectEquals(len(res[0].args), 4) + test.S(t).ExpectEquals(res[0].args[0], 123456) + test.S(t).ExpectEquals(res[0].args[1], 42) + test.S(t).ExpectEquals(res[0].args[2], 123456) + test.S(t).ExpectEquals(res[0].args[3], 42) + }) +} + +func TestApplierInstantDDL(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.DatabaseName = "test" + migrationContext.OriginalTableName = "mytable" + migrationContext.AlterStatementOptions = "ADD INDEX (foo)" + applier := NewApplier(migrationContext) + + t.Run("instantDDLstmt", func(t *testing.T) { + stmt := applier.generateInstantDDLQuery() + test.S(t).ExpectEquals(stmt, "ALTER /* gh-ost */ TABLE `test`.`mytable` ADD INDEX (foo), ALGORITHM=INSTANT") + }) +} diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 0ff296d82..2543f8e9a 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -7,6 +7,7 @@ package logic import ( "fmt" + "io" "os" "os/exec" "path/filepath" @@ -34,18 +35,16 @@ const ( type HooksExecutor struct { migrationContext *base.MigrationContext + writer io.Writer } func NewHooksExecutor(migrationContext *base.MigrationContext) *HooksExecutor { return &HooksExecutor{ migrationContext: migrationContext, + writer: os.Stderr, } } -func (this *HooksExecutor) initHooks() error { - return nil -} - func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) []string { env := os.Environ() env = append(env, fmt.Sprintf("GH_OST_DATABASE_NAME=%s", this.migrationContext.DatabaseName)) @@ -76,13 +75,13 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [ } // executeHook executes a command, and sets relevant environment variables -// combined output & error are printed to gh-ost's standard error. +// combined output & error are printed to the configured writer. func (this *HooksExecutor) executeHook(hook string, extraVariables ...string) error { cmd := exec.Command(hook) cmd.Env = this.applyEnvironmentVariables(extraVariables...) combinedOutput, err := cmd.CombinedOutput() - fmt.Fprintln(os.Stderr, string(combinedOutput)) + fmt.Fprintln(this.writer, string(combinedOutput)) return log.Errore(err) } diff --git a/go/logic/hooks_test.go b/go/logic/hooks_test.go new file mode 100644 index 000000000..3b28afe89 --- /dev/null +++ b/go/logic/hooks_test.go @@ -0,0 +1,113 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "bufio" + "bytes" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/openark/golib/tests" + + "github.com/github/gh-ost/go/base" +) + +func TestHooksExecutorExecuteHooks(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.AlterStatement = "ENGINE=InnoDB" + migrationContext.DatabaseName = "test" + migrationContext.Hostname = "test.example.com" + migrationContext.OriginalTableName = "tablename" + migrationContext.RowsDeltaEstimate = 1 + migrationContext.RowsEstimate = 122 + migrationContext.TotalRowsCopied = 123456 + migrationContext.SetETADuration(time.Minute) + migrationContext.SetProgressPct(50) + hooksExecutor := NewHooksExecutor(migrationContext) + + writeTmpHookFunc := func(testName, hookName, script string) (path string, err error) { + if path, err = os.MkdirTemp("", testName); err != nil { + return path, err + } + err = os.WriteFile(filepath.Join(path, hookName), []byte(script), 0777) + return path, err + } + + t.Run("does-not-exist", func(t *testing.T) { + migrationContext.HooksPath = "/does/not/exist" + tests.S(t).ExpectNil(hooksExecutor.executeHooks("test-hook")) + }) + + t.Run("failed", func(t *testing.T) { + var err error + if migrationContext.HooksPath, err = writeTmpHookFunc( + "TestHooksExecutorExecuteHooks-failed", + "failed-hook", + "#!/bin/sh\nexit 1", + ); err != nil { + panic(err) + } + defer os.RemoveAll(migrationContext.HooksPath) + tests.S(t).ExpectNotNil(hooksExecutor.executeHooks("failed-hook")) + }) + + t.Run("success", func(t *testing.T) { + var err error + if migrationContext.HooksPath, err = writeTmpHookFunc( + "TestHooksExecutorExecuteHooks-success", + "success-hook", + "#!/bin/sh\nenv", + ); err != nil { + panic(err) + } + defer os.RemoveAll(migrationContext.HooksPath) + + var buf bytes.Buffer + hooksExecutor.writer = &buf + tests.S(t).ExpectNil(hooksExecutor.executeHooks("success-hook", "TEST="+t.Name())) + + scanner := bufio.NewScanner(&buf) + for scanner.Scan() { + split := strings.SplitN(scanner.Text(), "=", 2) + switch split[0] { + case "GH_OST_COPIED_ROWS": + copiedRows, _ := strconv.ParseInt(split[1], 10, 64) + tests.S(t).ExpectEquals(copiedRows, migrationContext.TotalRowsCopied) + case "GH_OST_DATABASE_NAME": + tests.S(t).ExpectEquals(split[1], migrationContext.DatabaseName) + case "GH_OST_DDL": + tests.S(t).ExpectEquals(split[1], migrationContext.AlterStatement) + case "GH_OST_DRY_RUN": + tests.S(t).ExpectEquals(split[1], "false") + case "GH_OST_ESTIMATED_ROWS": + estimatedRows, _ := strconv.ParseInt(split[1], 10, 64) + tests.S(t).ExpectEquals(estimatedRows, int64(123)) + case "GH_OST_ETA_SECONDS": + etaSeconds, _ := strconv.ParseInt(split[1], 10, 64) + tests.S(t).ExpectEquals(etaSeconds, int64(60)) + case "GH_OST_EXECUTING_HOST": + tests.S(t).ExpectEquals(split[1], migrationContext.Hostname) + case "GH_OST_GHOST_TABLE_NAME": + tests.S(t).ExpectEquals(split[1], fmt.Sprintf("_%s_gho", migrationContext.OriginalTableName)) + case "GH_OST_OLD_TABLE_NAME": + tests.S(t).ExpectEquals(split[1], fmt.Sprintf("_%s_del", migrationContext.OriginalTableName)) + case "GH_OST_PROGRESS": + progress, _ := strconv.ParseFloat(split[1], 64) + tests.S(t).ExpectEquals(progress, 50.0) + case "GH_OST_TABLE_NAME": + tests.S(t).ExpectEquals(split[1], migrationContext.OriginalTableName) + case "TEST": + tests.S(t).ExpectEquals(split[1], t.Name()) + } + } + }) +} diff --git a/go/logic/inspect.go b/go/logic/inspect.go index a56210204..3ece8ab26 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -8,6 +8,7 @@ package logic import ( "context" gosql "database/sql" + "errors" "fmt" "reflect" "strings" @@ -132,10 +133,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { if err != nil { return err } - sharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys) - if err != nil { - return err - } + sharedUniqueKeys := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys) for i, sharedUniqueKey := range sharedUniqueKeys { this.applyColumnTypes(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &sharedUniqueKey.Columns) uniqueKeyIsValid := true @@ -192,6 +190,9 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { this.migrationContext.MappedSharedColumns.SetEnumToTextConversion(column.Name) this.migrationContext.MappedSharedColumns.SetEnumValues(column.Name, column.EnumValues) } + if column.Name == mappedColumn.Name && column.Charset != mappedColumn.Charset { + this.migrationContext.SharedColumns.SetCharsetConversion(column.Name, column.Charset, mappedColumn.Charset) + } } for _, column := range this.migrationContext.UniqueKey.Columns.Columns() { @@ -554,13 +555,11 @@ func (this *Inspector) CountTableRows(ctx context.Context) error { query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) var rowsEstimate int64 if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { - switch err { - case context.Canceled, context.DeadlineExceeded: + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { this.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err()) return mysql.Kill(this.db, connectionID) - default: - return err } + return err } // row count query finished. nil out the cancel func, so the main migration thread @@ -728,7 +727,7 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](* // getSharedUniqueKeys returns the intersection of two given unique keys, // testing by list of columns -func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (uniqueKeys [](*sql.UniqueKey), err error) { +func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys []*sql.UniqueKey) (uniqueKeys []*sql.UniqueKey) { // We actually do NOT rely on key name, just on the set of columns. This is because maybe // the ALTER is on the name itself... for _, originalUniqueKey := range originalUniqueKeys { @@ -738,7 +737,7 @@ func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [ } } } - return uniqueKeys, nil + return uniqueKeys } // getSharedColumns returns the intersection of two lists of columns in same order as the first list diff --git a/go/logic/inspect_test.go b/go/logic/inspect_test.go new file mode 100644 index 000000000..54bc48ff0 --- /dev/null +++ b/go/logic/inspect_test.go @@ -0,0 +1,31 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "testing" + + test "github.com/openark/golib/tests" + + "github.com/github/gh-ost/go/sql" +) + +func TestInspectGetSharedUniqueKeys(t *testing.T) { + origUniqKeys := []*sql.UniqueKey{ + {Columns: *sql.NewColumnList([]string{"id", "item_id"})}, + {Columns: *sql.NewColumnList([]string{"id", "org_id"})}, + } + ghostUniqKeys := []*sql.UniqueKey{ + {Columns: *sql.NewColumnList([]string{"id", "item_id"})}, + {Columns: *sql.NewColumnList([]string{"id", "org_id"})}, + {Columns: *sql.NewColumnList([]string{"item_id", "user_id"})}, + } + inspector := &Inspector{} + sharedUniqKeys := inspector.getSharedUniqueKeys(origUniqKeys, ghostUniqKeys) + test.S(t).ExpectEquals(len(sharedUniqKeys), 2) + test.S(t).ExpectEquals(sharedUniqKeys[0].Columns.String(), "id,item_id") + test.S(t).ExpectEquals(sharedUniqKeys[1].Columns.String(), "id,org_id") +} diff --git a/go/logic/migrator.go b/go/logic/migrator.go index e17f891a2..7256388a8 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -7,6 +7,7 @@ package logic import ( "context" + "errors" "fmt" "io" "math" @@ -21,6 +22,10 @@ import ( "github.com/github/gh-ost/go/sql" ) +var ( + ErrMigratorUnsupportedRenameAlter = errors.New("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost.") +) + type ChangelogState string const ( @@ -92,6 +97,7 @@ type Migrator struct { func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { migrator := &Migrator{ appVersion: appVersion, + hooksExecutor: NewHooksExecutor(context), migrationContext: context, parser: sql.NewAlterTableParser(), ghostTableMigrated: make(chan bool), @@ -107,15 +113,6 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { return migrator } -// initiateHooksExecutor -func (this *Migrator) initiateHooksExecutor() (err error) { - this.hooksExecutor = NewHooksExecutor(this.migrationContext) - if err := this.hooksExecutor.initHooks(); err != nil { - return err - } - return nil -} - // sleepWhileTrue sleeps indefinitely until the given function returns 'false' // (or fails with error) func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error { @@ -224,28 +221,22 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er case Migrated, ReadMigrationRangeValues: // no-op event case GhostTableMigrated: - { - this.ghostTableMigrated <- true - } + this.ghostTableMigrated <- true case AllEventsUpToLockProcessed: - { - var applyEventFunc tableWriteFunc = func() error { - this.allEventsUpToLockProcessed <- changelogStateString - return nil - } - // at this point we know all events up to lock have been read from the streamer, - // because the streamer works sequentially. So those events are either already handled, - // or have event functions in applyEventsQueue. - // So as not to create a potential deadlock, we write this func to applyEventsQueue - // asynchronously, understanding it doesn't really matter. - go func() { - this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) - }() + var applyEventFunc tableWriteFunc = func() error { + this.allEventsUpToLockProcessed <- changelogStateString + return nil } + // at this point we know all events up to lock have been read from the streamer, + // because the streamer works sequentially. So those events are either already handled, + // or have event functions in applyEventsQueue. + // So as not to create a potential deadlock, we write this func to applyEventsQueue + // asynchronously, understanding it doesn't really matter. + go func() { + this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) + }() default: - { - return fmt.Errorf("Unknown changelog state: %+v", changelogState) - } + return fmt.Errorf("Unknown changelog state: %+v", changelogState) } this.migrationContext.Log.Infof("Handled changelog state %s", changelogState) return nil @@ -270,13 +261,13 @@ func (this *Migrator) listenOnPanicAbort() { this.teardown() } -// validateStatement validates the `alter` statement meets criteria. +// validateAlterStatement validates the `alter` statement meets criteria. // At this time this means: // - column renames are approved // - no table rename allowed -func (this *Migrator) validateStatement() (err error) { +func (this *Migrator) validateAlterStatement() (err error) { if this.parser.IsRenameTable() { - return fmt.Errorf("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost.") + return ErrMigratorUnsupportedRenameAlter } if this.parser.HasNonTrivialRenames() && !this.migrationContext.SkipRenamedColumns { this.migrationContext.ColumnRenameMap = this.parser.GetNonTrivialRenames() @@ -346,16 +337,13 @@ func (this *Migrator) Migrate() (err error) { go this.listenOnPanicAbort() - if err := this.initiateHooksExecutor(); err != nil { - return err - } if err := this.hooksExecutor.onStartup(); err != nil { return err } if err := this.parser.ParseAlterStatement(this.migrationContext.AlterStatement); err != nil { return err } - if err := this.validateStatement(); err != nil { + if err := this.validateAlterStatement(); err != nil { return err } @@ -375,6 +363,17 @@ func (this *Migrator) Migrate() (err error) { if err := this.createFlagFiles(); err != nil { return err } + // In MySQL 8.0 (and possibly earlier) some DDL statements can be applied instantly. + // Attempt to do this if AttemptInstantDDL is set. + if this.migrationContext.AttemptInstantDDL { + this.migrationContext.Log.Infof("Attempting to execute alter with ALGORITHM=INSTANT") + if err := this.applier.AttemptInstantDDL(); err == nil { + this.migrationContext.Log.Infof("Success! table %s.%s migrated instantly", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) + return nil + } else { + this.migrationContext.Log.Infof("ALGORITHM=INSTANT not supported for this operation, proceeding with original algorithm: %s", err) + } + } initialLag, _ := this.inspector.getReplicationLag() this.migrationContext.Log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag) @@ -406,9 +405,9 @@ func (this *Migrator) Migrate() (err error) { if err := this.applier.ReadMigrationRangeValues(); err != nil { return err } - if err := this.initiateThrottler(); err != nil { - return err - } + + this.initiateThrottler() + if err := this.hooksExecutor.onBeforeRowCopy(); err != nil { return err } @@ -904,72 +903,49 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) { } } -// printStatus prints the progress status, and optionally additionally detailed -// dump of configuration. -// `rule` indicates the type of output expected. -// By default the status is written to standard output, but other writers can -// be used as well. -func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { - if rule == NoPrintStatusRule { - return - } - writers = append(writers, os.Stdout) - - elapsedTime := this.migrationContext.ElapsedTime() - elapsedSeconds := int64(elapsedTime.Seconds()) - totalRowsCopied := this.migrationContext.GetTotalRowsCopied() - rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate) - if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { - // Done copying rows. The totalRowsCopied value is the de-facto number of rows, - // and there is no further need to keep updating the value. - rowsEstimate = totalRowsCopied - } - var progressPct float64 - if rowsEstimate == 0 { - progressPct = 100.0 - } else { - progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate) - } - // we take the opportunity to update migration context with progressPct - this.migrationContext.SetProgressPct(progressPct) - // Before status, let's see if we should print a nice reminder for what exactly we're doing here. - shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0) - if rule == ForcePrintStatusAndHintRule { - shouldPrintMigrationStatusHint = true - } - if rule == ForcePrintStatusOnlyRule { - shouldPrintMigrationStatusHint = false - } - if shouldPrintMigrationStatusHint { - this.printMigrationStatusHint(writers...) +// getProgressPercent returns an estimate of migration progess as a percent. +func (this *Migrator) getProgressPercent(rowsEstimate int64) (progressPct float64) { + progressPct = 100.0 + if rowsEstimate > 0 { + progressPct *= float64(this.migrationContext.GetTotalRowsCopied()) / float64(rowsEstimate) } + return progressPct +} - var etaSeconds float64 = math.MaxFloat64 - var etaDuration = time.Duration(base.ETAUnknown) +// getMigrationETA returns the estimated duration of the migration +func (this *Migrator) getMigrationETA(rowsEstimate int64) (eta string, duration time.Duration) { + duration = time.Duration(base.ETAUnknown) + progressPct := this.getProgressPercent(rowsEstimate) if progressPct >= 100.0 { - etaDuration = 0 + duration = 0 } else if progressPct >= 0.1 { + totalRowsCopied := this.migrationContext.GetTotalRowsCopied() elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds() totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied) - etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds + etaSeconds := totalExpectedSeconds - elapsedRowCopySeconds if etaSeconds >= 0 { - etaDuration = time.Duration(etaSeconds) * time.Second + duration = time.Duration(etaSeconds) * time.Second } else { - etaDuration = 0 + duration = 0 } } - this.migrationContext.SetETADuration(etaDuration) - var eta string - switch etaDuration { + + switch duration { case 0: eta = "due" case time.Duration(base.ETAUnknown): eta = "N/A" default: - eta = base.PrettifyDurationOutput(etaDuration) + eta = base.PrettifyDurationOutput(duration) } - state := "migrating" + return eta, duration +} + +// getMigrationStateAndETA returns the state and eta of the migration. +func (this *Migrator) getMigrationStateAndETA(rowsEstimate int64) (state, eta string, etaDuration time.Duration) { + eta, etaDuration = this.getMigrationETA(rowsEstimate) + state = "migrating" if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows { state = "counting rows" } else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 { @@ -978,27 +954,78 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { } else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled { state = fmt.Sprintf("throttled, %s", throttleReason) } + return state, eta, etaDuration +} - shouldPrintStatus := false - if rule == HeuristicPrintStatusRule { - if elapsedSeconds <= 60 { - shouldPrintStatus = true - } else if etaSeconds <= 60 { - shouldPrintStatus = true - } else if etaSeconds <= 180 { - shouldPrintStatus = (elapsedSeconds%5 == 0) - } else if elapsedSeconds <= 180 { - shouldPrintStatus = (elapsedSeconds%5 == 0) - } else if this.migrationContext.TimeSincePointOfInterest().Seconds() <= 60 { - shouldPrintStatus = (elapsedSeconds%5 == 0) - } else { - shouldPrintStatus = (elapsedSeconds%30 == 0) - } +// shouldPrintStatus returns true when the migrator is due to print status info. +func (this *Migrator) shouldPrintStatus(rule PrintStatusRule, elapsedSeconds int64, etaDuration time.Duration) (shouldPrint bool) { + if rule != HeuristicPrintStatusRule { + return true + } + + etaSeconds := etaDuration.Seconds() + if elapsedSeconds <= 60 { + shouldPrint = true + } else if etaSeconds <= 60 { + shouldPrint = true + } else if etaSeconds <= 180 { + shouldPrint = (elapsedSeconds%5 == 0) + } else if elapsedSeconds <= 180 { + shouldPrint = (elapsedSeconds%5 == 0) + } else if this.migrationContext.TimeSincePointOfInterest().Seconds() <= 60 { + shouldPrint = (elapsedSeconds%5 == 0) } else { - // Not heuristic - shouldPrintStatus = true + shouldPrint = (elapsedSeconds%30 == 0) } - if !shouldPrintStatus { + + return shouldPrint +} + +// shouldPrintMigrationStatus returns true when the migrator is due to print the migration status hint +func (this *Migrator) shouldPrintMigrationStatusHint(rule PrintStatusRule, elapsedSeconds int64) (shouldPrint bool) { + if elapsedSeconds%600 == 0 { + shouldPrint = true + } else if rule == ForcePrintStatusAndHintRule { + shouldPrint = true + } + return shouldPrint +} + +// printStatus prints the progress status, and optionally additionally detailed +// dump of configuration. +// `rule` indicates the type of output expected. +// By default the status is written to standard output, but other writers can +// be used as well. +func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { + if rule == NoPrintStatusRule { + return + } + writers = append(writers, os.Stdout) + + elapsedTime := this.migrationContext.ElapsedTime() + elapsedSeconds := int64(elapsedTime.Seconds()) + totalRowsCopied := this.migrationContext.GetTotalRowsCopied() + rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate) + if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { + // Done copying rows. The totalRowsCopied value is the de-facto number of rows, + // and there is no further need to keep updating the value. + rowsEstimate = totalRowsCopied + } + + // we take the opportunity to update migration context with progressPct + progressPct := this.getProgressPercent(rowsEstimate) + this.migrationContext.SetProgressPct(progressPct) + + // Before status, let's see if we should print a nice reminder for what exactly we're doing here. + if this.shouldPrintMigrationStatusHint(rule, elapsedSeconds) { + this.printMigrationStatusHint(writers...) + } + + // Get state + ETA + state, eta, etaDuration := this.getMigrationStateAndETA(rowsEstimate) + this.migrationContext.SetETADuration(etaDuration) + + if !this.shouldPrintStatus(rule, elapsedSeconds, etaDuration) { return } @@ -1017,10 +1044,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { ) this.applier.WriteChangelog( fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()), - status, + state, ) w := io.MultiWriter(writers...) fmt.Fprintln(w, status) + this.migrationContext.Log.Infof(status) if this.migrationContext.HooksStatusIntervalSec > 0 && elapsedSeconds%this.migrationContext.HooksStatusIntervalSec == 0 { this.hooksExecutor.onStatus(status) @@ -1080,7 +1108,7 @@ func (this *Migrator) addDMLEventsListener() error { } // initiateThrottler kicks in the throttling collection and the throttling checks. -func (this *Migrator) initiateThrottler() error { +func (this *Migrator) initiateThrottler() { this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector, this.appVersion) go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) @@ -1090,8 +1118,6 @@ func (this *Migrator) initiateThrottler() error { <-this.firstThrottlingCollected // other, general metrics this.migrationContext.Log.Infof("First throttle metrics collected") go this.throttler.initiateThrottlerChecks() - - return nil } func (this *Migrator) initiateApplier() error { @@ -1298,7 +1324,7 @@ func (this *Migrator) executeWriteFuncs() error { if niceRatio := this.migrationContext.GetNiceRatio(); niceRatio > 0 { copyRowsDuration := time.Since(copyRowsStartTime) sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds()) - sleepTime := time.Duration(time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond) + sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond time.Sleep(sleepTime) } } diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go new file mode 100644 index 000000000..242a749a5 --- /dev/null +++ b/go/logic/migrator_test.go @@ -0,0 +1,256 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "errors" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/openark/golib/tests" + + "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/sql" +) + +func TestMigratorOnChangelogEvent(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.2.3") + + t.Run("heartbeat", func(t *testing.T) { + columnValues := sql.ToColumnValues([]interface{}{ + 123, + time.Now().Unix(), + "heartbeat", + "2022-08-16T00:45:10.52Z", + }) + tests.S(t).ExpectNil(migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues, + })) + }) + + t.Run("state-AllEventsUpToLockProcessed", func(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + es := <-migrator.applyEventsQueue + tests.S(t).ExpectNotNil(es) + tests.S(t).ExpectNotNil(es.writeFunc) + }(&wg) + + columnValues := sql.ToColumnValues([]interface{}{ + 123, + time.Now().Unix(), + "state", + AllEventsUpToLockProcessed, + }) + tests.S(t).ExpectNil(migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues, + })) + wg.Wait() + }) + + t.Run("state-GhostTableMigrated", func(t *testing.T) { + go func() { + tests.S(t).ExpectTrue(<-migrator.ghostTableMigrated) + }() + + columnValues := sql.ToColumnValues([]interface{}{ + 123, + time.Now().Unix(), + "state", + GhostTableMigrated, + }) + tests.S(t).ExpectNil(migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues, + })) + }) + + t.Run("state-Migrated", func(t *testing.T) { + columnValues := sql.ToColumnValues([]interface{}{ + 123, + time.Now().Unix(), + "state", + Migrated, + }) + tests.S(t).ExpectNil(migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues, + })) + }) + + t.Run("state-ReadMigrationRangeValues", func(t *testing.T) { + columnValues := sql.ToColumnValues([]interface{}{ + 123, + time.Now().Unix(), + "state", + ReadMigrationRangeValues, + }) + tests.S(t).ExpectNil(migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues, + })) + }) +} + +func TestMigratorValidateStatement(t *testing.T) { + t.Run("add-column", func(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.2.3") + tests.S(t).ExpectNil(migrator.parser.ParseAlterStatement(`ALTER TABLE test ADD test_new VARCHAR(64) NOT NULL`)) + + tests.S(t).ExpectNil(migrator.validateAlterStatement()) + tests.S(t).ExpectEquals(len(migrator.migrationContext.DroppedColumnsMap), 0) + }) + + t.Run("drop-column", func(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.2.3") + tests.S(t).ExpectNil(migrator.parser.ParseAlterStatement(`ALTER TABLE test DROP abc`)) + + tests.S(t).ExpectNil(migrator.validateAlterStatement()) + tests.S(t).ExpectEquals(len(migrator.migrationContext.DroppedColumnsMap), 1) + _, exists := migrator.migrationContext.DroppedColumnsMap["abc"] + tests.S(t).ExpectTrue(exists) + }) + + t.Run("rename-column", func(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.2.3") + tests.S(t).ExpectNil(migrator.parser.ParseAlterStatement(`ALTER TABLE test CHANGE test123 test1234 bigint unsigned`)) + + err := migrator.validateAlterStatement() + tests.S(t).ExpectNotNil(err) + tests.S(t).ExpectTrue(strings.HasPrefix(err.Error(), "gh-ost believes the ALTER statement renames columns")) + tests.S(t).ExpectEquals(len(migrator.migrationContext.DroppedColumnsMap), 0) + }) + + t.Run("rename-column-approved", func(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.2.3") + migrator.migrationContext.ApproveRenamedColumns = true + tests.S(t).ExpectNil(migrator.parser.ParseAlterStatement(`ALTER TABLE test CHANGE test123 test1234 bigint unsigned`)) + + tests.S(t).ExpectNil(migrator.validateAlterStatement()) + tests.S(t).ExpectEquals(len(migrator.migrationContext.DroppedColumnsMap), 0) + }) + + t.Run("rename-table", func(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.2.3") + tests.S(t).ExpectNil(migrator.parser.ParseAlterStatement(`ALTER TABLE test RENAME TO test_new`)) + + err := migrator.validateAlterStatement() + tests.S(t).ExpectNotNil(err) + tests.S(t).ExpectTrue(errors.Is(err, ErrMigratorUnsupportedRenameAlter)) + tests.S(t).ExpectEquals(len(migrator.migrationContext.DroppedColumnsMap), 0) + }) +} + +func TestMigratorCreateFlagFiles(t *testing.T) { + tmpdir, err := os.MkdirTemp("", t.Name()) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpdir) + + migrationContext := base.NewMigrationContext() + migrationContext.PostponeCutOverFlagFile = filepath.Join(tmpdir, "cut-over.flag") + migrator := NewMigrator(migrationContext, "1.2.3") + tests.S(t).ExpectNil(migrator.createFlagFiles()) + tests.S(t).ExpectNil(migrator.createFlagFiles()) // twice to test already-exists + + _, err = os.Stat(migrationContext.PostponeCutOverFlagFile) + tests.S(t).ExpectNil(err) +} + +func TestMigratorGetProgressPercent(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.2.3") + + { + tests.S(t).ExpectEquals(migrator.getProgressPercent(0), float64(100.0)) + } + { + migrationContext.TotalRowsCopied = 250 + tests.S(t).ExpectEquals(migrator.getProgressPercent(1000), float64(25.0)) + } +} + +func TestMigratorGetMigrationStateAndETA(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.2.3") + now := time.Now() + migrationContext.RowCopyStartTime = now.Add(-time.Minute) + migrationContext.RowCopyEndTime = now + + { + migrationContext.TotalRowsCopied = 456 + state, eta, etaDuration := migrator.getMigrationStateAndETA(123456) + tests.S(t).ExpectEquals(state, "migrating") + tests.S(t).ExpectEquals(eta, "4h29m44s") + tests.S(t).ExpectEquals(etaDuration.String(), "4h29m44s") + } + { + migrationContext.TotalRowsCopied = 456 + state, eta, etaDuration := migrator.getMigrationStateAndETA(456) + tests.S(t).ExpectEquals(state, "migrating") + tests.S(t).ExpectEquals(eta, "due") + tests.S(t).ExpectEquals(etaDuration.String(), "0s") + } + { + migrationContext.TotalRowsCopied = 123456 + state, eta, etaDuration := migrator.getMigrationStateAndETA(456) + tests.S(t).ExpectEquals(state, "migrating") + tests.S(t).ExpectEquals(eta, "due") + tests.S(t).ExpectEquals(etaDuration.String(), "0s") + } + { + atomic.StoreInt64(&migrationContext.CountingRowsFlag, 1) + state, eta, etaDuration := migrator.getMigrationStateAndETA(123456) + tests.S(t).ExpectEquals(state, "counting rows") + tests.S(t).ExpectEquals(eta, "due") + tests.S(t).ExpectEquals(etaDuration.String(), "0s") + } + { + atomic.StoreInt64(&migrationContext.CountingRowsFlag, 0) + atomic.StoreInt64(&migrationContext.IsPostponingCutOver, 1) + state, eta, etaDuration := migrator.getMigrationStateAndETA(123456) + tests.S(t).ExpectEquals(state, "postponing cut-over") + tests.S(t).ExpectEquals(eta, "due") + tests.S(t).ExpectEquals(etaDuration.String(), "0s") + } +} + +func TestMigratorShouldPrintStatus(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.2.3") + + tests.S(t).ExpectTrue(migrator.shouldPrintStatus(NoPrintStatusRule, 10, time.Second)) // test 'rule != HeuristicPrintStatusRule' return + tests.S(t).ExpectTrue(migrator.shouldPrintStatus(HeuristicPrintStatusRule, 10, time.Second)) // test 'etaDuration.Seconds() <= 60' + tests.S(t).ExpectTrue(migrator.shouldPrintStatus(HeuristicPrintStatusRule, 90, time.Second)) // test 'etaDuration.Seconds() <= 60' again + tests.S(t).ExpectTrue(migrator.shouldPrintStatus(HeuristicPrintStatusRule, 90, time.Minute)) // test 'etaDuration.Seconds() <= 180' + tests.S(t).ExpectTrue(migrator.shouldPrintStatus(HeuristicPrintStatusRule, 60, 90*time.Second)) // test 'elapsedSeconds <= 180' + tests.S(t).ExpectFalse(migrator.shouldPrintStatus(HeuristicPrintStatusRule, 61, 90*time.Second)) // test 'elapsedSeconds <= 180' + tests.S(t).ExpectFalse(migrator.shouldPrintStatus(HeuristicPrintStatusRule, 99, 210*time.Second)) // test 'elapsedSeconds <= 180' + tests.S(t).ExpectFalse(migrator.shouldPrintStatus(HeuristicPrintStatusRule, 12345, 86400*time.Second)) // test 'else' + tests.S(t).ExpectTrue(migrator.shouldPrintStatus(HeuristicPrintStatusRule, 30030, 86400*time.Second)) // test 'else' again +} diff --git a/go/logic/server.go b/go/logic/server.go index 5356a8227..4b1b87023 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -1,5 +1,5 @@ /* - Copyright 2021 GitHub Inc. + Copyright 2022 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -122,8 +122,6 @@ func (this *Server) onServerCommand(command string, writer *bufio.Writer) (err e // applyServerCommand parses and executes commands by user func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (printStatusRule PrintStatusRule, err error) { - printStatusRule = NoPrintStatusRule - tokens := strings.SplitN(command, "=", 2) command = strings.TrimSpace(tokens[0]) arg := "" diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 3e4d2be13..a7a0d7d89 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -59,7 +59,6 @@ func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer // AddListener registers a new listener for binlog events, on a per-table basis func (this *EventsStreamer) AddListener( async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) { - this.listenersMutex.Lock() defer this.listenersMutex.Unlock() diff --git a/go/logic/throttler.go b/go/logic/throttler.go index ea39725fb..52c5c979f 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -180,7 +180,6 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo // collectControlReplicasLag polls all the control replicas to get maximum lag value func (this *Throttler) collectControlReplicasLag() { - if atomic.LoadInt64(&this.migrationContext.HibernateUntil) > 0 { return } @@ -308,6 +307,8 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- if err != nil { return false, err } + defer resp.Body.Close() + atomic.StoreInt64(&this.migrationContext.ThrottleHTTPStatusCode, int64(resp.StatusCode)) return false, nil } diff --git a/go/mysql/connection.go b/go/mysql/connection.go index 1c24a3417..6250925b7 100644 --- a/go/mysql/connection.go +++ b/go/mysql/connection.go @@ -1,5 +1,5 @@ /* - Copyright 2016 GitHub Inc. + Copyright 2022 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -12,6 +12,7 @@ import ( "fmt" "io/ioutil" "net" + "strings" "github.com/go-sql-driver/mysql" ) @@ -22,12 +23,13 @@ const ( // ConnectionConfig is the minimal configuration required to connect to a MySQL server type ConnectionConfig struct { - Key InstanceKey - User string - Password string - ImpliedKey *InstanceKey - tlsConfig *tls.Config - Timeout float64 + Key InstanceKey + User string + Password string + ImpliedKey *InstanceKey + tlsConfig *tls.Config + Timeout float64 + TransactionIsolation string } func NewConnectionConfig() *ConnectionConfig { @@ -41,11 +43,12 @@ func NewConnectionConfig() *ConnectionConfig { // DuplicateCredentials creates a new connection config with given key and with same credentials as this config func (this *ConnectionConfig) DuplicateCredentials(key InstanceKey) *ConnectionConfig { config := &ConnectionConfig{ - Key: key, - User: this.User, - Password: this.Password, - tlsConfig: this.tlsConfig, - Timeout: this.Timeout, + Key: key, + User: this.User, + Password: this.Password, + tlsConfig: this.tlsConfig, + Timeout: this.Timeout, + TransactionIsolation: this.TransactionIsolation, } config.ImpliedKey = &config.Key return config @@ -112,12 +115,23 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string { // Wrap IPv6 literals in square brackets hostname = fmt.Sprintf("[%s]", hostname) } - interpolateParams := true + // go-mysql-driver defaults to false if tls param is not provided; explicitly setting here to // simplify construction of the DSN below. tlsOption := "false" if this.tlsConfig != nil { tlsOption = TLS_CONFIG_KEY } - return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?timeout=%fs&readTimeout=%fs&writeTimeout=%fs&interpolateParams=%t&autocommit=true&charset=utf8mb4,utf8,latin1&tls=%s", this.User, this.Password, hostname, this.Key.Port, databaseName, this.Timeout, this.Timeout, this.Timeout, interpolateParams, tlsOption) + connectionParams := []string{ + "autocommit=true", + "charset=utf8mb4,utf8,latin1", + "interpolateParams=true", + fmt.Sprintf("tls=%s", tlsOption), + fmt.Sprintf("transaction_isolation=%q", this.TransactionIsolation), + fmt.Sprintf("timeout=%fs", this.Timeout), + fmt.Sprintf("readTimeout=%fs", this.Timeout), + fmt.Sprintf("writeTimeout=%fs", this.Timeout), + } + + return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?%s", this.User, this.Password, hostname, this.Key.Port, databaseName, strings.Join(connectionParams, "&")) } diff --git a/go/mysql/connection_test.go b/go/mysql/connection_test.go index f9c45de85..5667235f5 100644 --- a/go/mysql/connection_test.go +++ b/go/mysql/connection_test.go @@ -1,5 +1,5 @@ /* - Copyright 2016 GitHub Inc. + Copyright 2022 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -13,6 +13,10 @@ import ( test "github.com/openark/golib/tests" ) +const ( + transactionIsolation = "REPEATABLE-READ" +) + func init() { log.SetLevel(log.ERROR) } @@ -25,6 +29,7 @@ func TestNewConnectionConfig(t *testing.T) { test.S(t).ExpectEquals(c.ImpliedKey.Port, 0) test.S(t).ExpectEquals(c.User, "") test.S(t).ExpectEquals(c.Password, "") + test.S(t).ExpectEquals(c.TransactionIsolation, "") } func TestDuplicateCredentials(t *testing.T) { @@ -36,6 +41,7 @@ func TestDuplicateCredentials(t *testing.T) { InsecureSkipVerify: true, ServerName: "feathers", } + c.TransactionIsolation = transactionIsolation dup := c.DuplicateCredentials(InstanceKey{Hostname: "otherhost", Port: 3310}) test.S(t).ExpectEquals(dup.Key.Hostname, "otherhost") @@ -45,6 +51,7 @@ func TestDuplicateCredentials(t *testing.T) { test.S(t).ExpectEquals(dup.User, "gromit") test.S(t).ExpectEquals(dup.Password, "penguin") test.S(t).ExpectEquals(dup.tlsConfig, c.tlsConfig) + test.S(t).ExpectEquals(dup.TransactionIsolation, c.TransactionIsolation) } func TestDuplicate(t *testing.T) { @@ -52,6 +59,7 @@ func TestDuplicate(t *testing.T) { c.Key = InstanceKey{Hostname: "myhost", Port: 3306} c.User = "gromit" c.Password = "penguin" + c.TransactionIsolation = transactionIsolation dup := c.Duplicate() test.S(t).ExpectEquals(dup.Key.Hostname, "myhost") @@ -60,6 +68,7 @@ func TestDuplicate(t *testing.T) { test.S(t).ExpectEquals(dup.ImpliedKey.Port, 3306) test.S(t).ExpectEquals(dup.User, "gromit") test.S(t).ExpectEquals(dup.Password, "penguin") + test.S(t).ExpectEquals(dup.TransactionIsolation, transactionIsolation) } func TestGetDBUri(t *testing.T) { @@ -67,9 +76,11 @@ func TestGetDBUri(t *testing.T) { c.Key = InstanceKey{Hostname: "myhost", Port: 3306} c.User = "gromit" c.Password = "penguin" + c.Timeout = 1.2345 + c.TransactionIsolation = transactionIsolation uri := c.GetDBUri("test") - test.S(t).ExpectEquals(uri, "gromit:penguin@tcp(myhost:3306)/test?timeout=0.000000s&readTimeout=0.000000s&writeTimeout=0.000000s&interpolateParams=true&autocommit=true&charset=utf8mb4,utf8,latin1&tls=false") + test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&charset=utf8mb4,utf8,latin1&interpolateParams=true&tls=false&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`) } func TestGetDBUriWithTLSSetup(t *testing.T) { @@ -77,8 +88,10 @@ func TestGetDBUriWithTLSSetup(t *testing.T) { c.Key = InstanceKey{Hostname: "myhost", Port: 3306} c.User = "gromit" c.Password = "penguin" + c.Timeout = 1.2345 c.tlsConfig = &tls.Config{} + c.TransactionIsolation = transactionIsolation uri := c.GetDBUri("test") - test.S(t).ExpectEquals(uri, "gromit:penguin@tcp(myhost:3306)/test?timeout=0.000000s&readTimeout=0.000000s&writeTimeout=0.000000s&interpolateParams=true&autocommit=true&charset=utf8mb4,utf8,latin1&tls=ghost") + test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&charset=utf8mb4,utf8,latin1&interpolateParams=true&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`) } diff --git a/go/mysql/instance_key.go b/go/mysql/instance_key.go index 679bdc9f0..3d2bff114 100644 --- a/go/mysql/instance_key.go +++ b/go/mysql/instance_key.go @@ -35,8 +35,7 @@ const detachHint = "//" // ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306 func NewRawInstanceKey(hostPort string) (*InstanceKey, error) { - hostname := "" - port := "" + var hostname, port string if submatch := ipv4HostPortRegexp.FindStringSubmatch(hostPort); len(submatch) > 0 { hostname = submatch[1] port = submatch[2] diff --git a/go/sql/builder.go b/go/sql/builder.go index b24a848d1..0169390c2 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -167,7 +167,7 @@ func BuildRangeComparison(columns []string, values []string, args []interface{}, if includeEquals { comparison, err := BuildEqualsComparison(columns, values) if err != nil { - return "", explodedArgs, nil + return "", explodedArgs, err } comparisons = append(comparisons, comparison) explodedArgs = append(explodedArgs, args...) @@ -505,13 +505,13 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol return "", sharedArgs, uniqueKeyArgs, err } result = fmt.Sprintf(` - update /* gh-ost %s.%s */ - %s.%s + update /* gh-ost %s.%s */ + %s.%s set %s where - %s - `, databaseName, tableName, + %s + `, databaseName, tableName, databaseName, tableName, setClause, equalsComparison, diff --git a/go/sql/parser.go b/go/sql/parser.go index eac0bdce3..2ddc60f50 100644 --- a/go/sql/parser.go +++ b/go/sql/parser.go @@ -1,5 +1,5 @@ /* - Copyright 2016 GitHub Inc. + Copyright 2022 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -62,7 +62,7 @@ func NewParserFromAlterStatement(alterStatement string) *AlterTableParser { return parser } -func (this *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tokens []string, err error) { +func (this *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tokens []string) { terminatingQuote := rune(0) f := func(c rune) bool { switch { @@ -86,7 +86,7 @@ func (this *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tok for i := range tokens { tokens[i] = strings.TrimSpace(tokens[i]) } - return tokens, nil + return tokens } func (this *AlterTableParser) sanitizeQuotesFromAlterStatement(alterStatement string) (strippedStatement string) { @@ -95,7 +95,7 @@ func (this *AlterTableParser) sanitizeQuotesFromAlterStatement(alterStatement st return strippedStatement } -func (this *AlterTableParser) parseAlterToken(alterToken string) (err error) { +func (this *AlterTableParser) parseAlterToken(alterToken string) { { // rename allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterToken, -1) @@ -131,11 +131,9 @@ func (this *AlterTableParser) parseAlterToken(alterToken string) (err error) { this.isAutoIncrementDefined = true } } - return nil } func (this *AlterTableParser) ParseAlterStatement(alterStatement string) (err error) { - this.alterStatementOptions = alterStatement for _, alterTableRegexp := range alterTableExplicitSchemaTableRegexps { if submatch := alterTableRegexp.FindStringSubmatch(this.alterStatementOptions); len(submatch) > 0 { @@ -152,8 +150,7 @@ func (this *AlterTableParser) ParseAlterStatement(alterStatement string) (err er break } } - alterTokens, _ := this.tokenizeAlterStatement(this.alterStatementOptions) - for _, alterToken := range alterTokens { + for _, alterToken := range this.tokenizeAlterStatement(this.alterStatementOptions) { alterToken = this.sanitizeQuotesFromAlterStatement(alterToken) this.parseAlterToken(alterToken) this.alterTokens = append(this.alterTokens, alterToken) diff --git a/go/sql/parser_test.go b/go/sql/parser_test.go index 645989d76..df9284280 100644 --- a/go/sql/parser_test.go +++ b/go/sql/parser_test.go @@ -1,5 +1,5 @@ /* - Copyright 2016 GitHub Inc. + Copyright 2022 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -40,7 +40,6 @@ func TestParseAlterStatementTrivialRename(t *testing.T) { } func TestParseAlterStatementWithAutoIncrement(t *testing.T) { - statements := []string{ "auto_increment=7", "auto_increment = 7", @@ -100,37 +99,37 @@ func TestTokenizeAlterStatement(t *testing.T) { parser := NewAlterTableParser() { alterStatement := "add column t int" - tokens, _ := parser.tokenizeAlterStatement(alterStatement) + tokens := parser.tokenizeAlterStatement(alterStatement) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int"})) } { alterStatement := "add column t int, change column i int" - tokens, _ := parser.tokenizeAlterStatement(alterStatement) + tokens := parser.tokenizeAlterStatement(alterStatement) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int"})) } { alterStatement := "add column t int, change column i int 'some comment'" - tokens, _ := parser.tokenizeAlterStatement(alterStatement) + tokens := parser.tokenizeAlterStatement(alterStatement) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment'"})) } { alterStatement := "add column t int, change column i int 'some comment, with comma'" - tokens, _ := parser.tokenizeAlterStatement(alterStatement) + tokens := parser.tokenizeAlterStatement(alterStatement) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment, with comma'"})) } { alterStatement := "add column t int, add column d decimal(10,2)" - tokens, _ := parser.tokenizeAlterStatement(alterStatement) + tokens := parser.tokenizeAlterStatement(alterStatement) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "add column d decimal(10,2)"})) } { alterStatement := "add column t int, add column e enum('a','b','c')" - tokens, _ := parser.tokenizeAlterStatement(alterStatement) + tokens := parser.tokenizeAlterStatement(alterStatement) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "add column e enum('a','b','c')"})) } { alterStatement := "add column t int(11), add column e enum('a','b','c')" - tokens, _ := parser.tokenizeAlterStatement(alterStatement) + tokens := parser.tokenizeAlterStatement(alterStatement) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int(11)", "add column e enum('a','b','c')"})) } } @@ -150,7 +149,6 @@ func TestSanitizeQuotesFromAlterStatement(t *testing.T) { } func TestParseAlterStatementDroppedColumns(t *testing.T) { - { parser := NewAlterTableParser() statement := "drop column b" @@ -190,7 +188,6 @@ func TestParseAlterStatementDroppedColumns(t *testing.T) { } func TestParseAlterStatementRenameTable(t *testing.T) { - { parser := NewAlterTableParser() statement := "drop column b" @@ -230,7 +227,6 @@ func TestParseAlterStatementRenameTable(t *testing.T) { } func TestParseAlterStatementExplicitTable(t *testing.T) { - { parser := NewAlterTableParser() statement := "drop column b" diff --git a/go/sql/types.go b/go/sql/types.go index 3c4ce5e85..3be1a44ca 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -32,6 +32,11 @@ type TimezoneConversion struct { ToTimezone string } +type CharacterSetConversion struct { + ToCharset string + FromCharset string +} + type Column struct { Name string IsUnsigned bool @@ -43,17 +48,22 @@ type Column struct { // add Octet length for binary type, fix bytes with suffix "00" get clipped in mysql binlog. // https://github.com/github/gh-ost/issues/909 BinaryOctetLength uint + charsetConversion *CharacterSetConversion } func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} { if s, ok := arg.(string); ok { - // string, charset conversion - if encoding, ok := charsetEncodingMap[this.Charset]; ok { - arg, _ = encoding.NewDecoder().String(s) + arg2Bytes := []byte(s) + // convert to bytes if character string without charsetConversion. + if this.Charset != "" && this.charsetConversion == nil { + arg = arg2Bytes + } else { + if encoding, ok := charsetEncodingMap[this.Charset]; ok { + arg, _ = encoding.NewDecoder().String(s) + } } if this.Type == BinaryColumnType && isUniqueKeyColumn { - arg2Bytes := []byte(arg.(string)) size := len(arg2Bytes) if uint(size) < this.BinaryOctetLength { buf := bytes.NewBuffer(arg2Bytes) @@ -238,6 +248,10 @@ func (this *ColumnList) Len() int { return len(this.columns) } +func (this *ColumnList) SetCharsetConversion(columnName string, fromCharset string, toCharset string) { + this.GetColumn(columnName).charsetConversion = &CharacterSetConversion{FromCharset: fromCharset, ToCharset: toCharset} +} + // UniqueKey is the combination of a key's name and columns type UniqueKey struct { Name string diff --git a/localtests/attempt-instant-ddl/create.sql b/localtests/attempt-instant-ddl/create.sql new file mode 100644 index 000000000..9371238b7 --- /dev/null +++ b/localtests/attempt-instant-ddl/create.sql @@ -0,0 +1,13 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + i int not null, + color varchar(32), + primary key(id) +) auto_increment=1; + +drop event if exists gh_ost_test; + +insert into gh_ost_test values (null, 11, 'red'); +insert into gh_ost_test values (null, 13, 'green'); +insert into gh_ost_test values (null, 17, 'blue'); diff --git a/localtests/attempt-instant-ddl/extra_args b/localtests/attempt-instant-ddl/extra_args new file mode 100644 index 000000000..70c8a5211 --- /dev/null +++ b/localtests/attempt-instant-ddl/extra_args @@ -0,0 +1 @@ +--attempt-instant-ddl diff --git a/localtests/convert-utf8mb4/create.sql b/localtests/convert-utf8mb4/create.sql index 05f1a1380..e35e6889f 100644 --- a/localtests/convert-utf8mb4/create.sql +++ b/localtests/convert-utf8mb4/create.sql @@ -7,9 +7,6 @@ create table gh_ost_test ( primary key(id) ) auto_increment=1; -insert into gh_ost_test values (null, 'átesting'); - - insert into gh_ost_test values (null, 'Hello world, Καλημέρα κόσμε, コンニチハ', 'átesting0', 'initial'); drop event if exists gh_ost_test; diff --git a/localtests/datetime-with-zero/create.sql b/localtests/datetime-with-zero/create.sql new file mode 100644 index 000000000..526d1e6a4 --- /dev/null +++ b/localtests/datetime-with-zero/create.sql @@ -0,0 +1,20 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int unsigned auto_increment, + i int not null, + dt datetime, + primary key(id) +) auto_increment=1; + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into gh_ost_test values (null, 7, '2010-10-20 10:20:30'); +end ;; diff --git a/localtests/datetime-with-zero/extra_args b/localtests/datetime-with-zero/extra_args new file mode 100644 index 000000000..0d60fb447 --- /dev/null +++ b/localtests/datetime-with-zero/extra_args @@ -0,0 +1 @@ +--allow-zero-in-date --alter="change column dt dt datetime not null default '1970-00-00 00:00:00'" diff --git a/localtests/discard-fk/ignore_versions b/localtests/discard-fk/ignore_versions new file mode 100644 index 000000000..cf02abe24 --- /dev/null +++ b/localtests/discard-fk/ignore_versions @@ -0,0 +1 @@ +Percona \ No newline at end of file diff --git a/localtests/existing-datetime-with-zero/create.sql b/localtests/existing-datetime-with-zero/create.sql new file mode 100644 index 000000000..5320d2cce --- /dev/null +++ b/localtests/existing-datetime-with-zero/create.sql @@ -0,0 +1,21 @@ +set session sql_mode=''; +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int unsigned auto_increment, + i int not null, + dt datetime not null default '1970-00-00 00:00:00', + primary key(id) +) auto_increment=1; + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into gh_ost_test values (null, 7, '2010-10-20 10:20:30'); +end ;; diff --git a/localtests/existing-datetime-with-zero/extra_args b/localtests/existing-datetime-with-zero/extra_args new file mode 100644 index 000000000..eb0e2ffdf --- /dev/null +++ b/localtests/existing-datetime-with-zero/extra_args @@ -0,0 +1 @@ +--allow-zero-in-date --alter="engine=innodb" diff --git a/localtests/fail-datetime-with-zero/create.sql b/localtests/fail-datetime-with-zero/create.sql new file mode 100644 index 000000000..526d1e6a4 --- /dev/null +++ b/localtests/fail-datetime-with-zero/create.sql @@ -0,0 +1,20 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int unsigned auto_increment, + i int not null, + dt datetime, + primary key(id) +) auto_increment=1; + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into gh_ost_test values (null, 7, '2010-10-20 10:20:30'); +end ;; diff --git a/localtests/fail-datetime-with-zero/expect_failure b/localtests/fail-datetime-with-zero/expect_failure new file mode 100644 index 000000000..79356a144 --- /dev/null +++ b/localtests/fail-datetime-with-zero/expect_failure @@ -0,0 +1 @@ +Invalid default value for 'dt' diff --git a/localtests/fail-datetime-with-zero/extra_args b/localtests/fail-datetime-with-zero/extra_args new file mode 100644 index 000000000..9b72ac2c8 --- /dev/null +++ b/localtests/fail-datetime-with-zero/extra_args @@ -0,0 +1 @@ +--alter="change column dt dt datetime not null default '1970-00-00 00:00:00'" diff --git a/localtests/fail-existing-datetime-with-zero/create.sql b/localtests/fail-existing-datetime-with-zero/create.sql new file mode 100644 index 000000000..5320d2cce --- /dev/null +++ b/localtests/fail-existing-datetime-with-zero/create.sql @@ -0,0 +1,21 @@ +set session sql_mode=''; +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int unsigned auto_increment, + i int not null, + dt datetime not null default '1970-00-00 00:00:00', + primary key(id) +) auto_increment=1; + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into gh_ost_test values (null, 7, '2010-10-20 10:20:30'); +end ;; diff --git a/localtests/fail-existing-datetime-with-zero/expect_failure b/localtests/fail-existing-datetime-with-zero/expect_failure new file mode 100644 index 000000000..79356a144 --- /dev/null +++ b/localtests/fail-existing-datetime-with-zero/expect_failure @@ -0,0 +1 @@ +Invalid default value for 'dt' diff --git a/localtests/fail-existing-datetime-with-zero/extra_args b/localtests/fail-existing-datetime-with-zero/extra_args new file mode 100644 index 000000000..31bc4798b --- /dev/null +++ b/localtests/fail-existing-datetime-with-zero/extra_args @@ -0,0 +1 @@ +--alter="engine=innodb" diff --git a/localtests/fail-fk-parent/ignore_versions b/localtests/fail-fk-parent/ignore_versions new file mode 100644 index 000000000..cf02abe24 --- /dev/null +++ b/localtests/fail-fk-parent/ignore_versions @@ -0,0 +1 @@ +Percona \ No newline at end of file diff --git a/localtests/fail-fk/ignore_versions b/localtests/fail-fk/ignore_versions new file mode 100644 index 000000000..cf02abe24 --- /dev/null +++ b/localtests/fail-fk/ignore_versions @@ -0,0 +1 @@ +Percona \ No newline at end of file diff --git a/localtests/generated-columns-add/ignore_versions b/localtests/generated-columns-add/ignore_versions new file mode 100644 index 000000000..cf02abe24 --- /dev/null +++ b/localtests/generated-columns-add/ignore_versions @@ -0,0 +1 @@ +Percona \ No newline at end of file diff --git a/localtests/generated-columns-rename/ignore_versions b/localtests/generated-columns-rename/ignore_versions new file mode 100644 index 000000000..cf02abe24 --- /dev/null +++ b/localtests/generated-columns-rename/ignore_versions @@ -0,0 +1 @@ +Percona \ No newline at end of file diff --git a/localtests/generated-columns-unique/ignore_versions b/localtests/generated-columns-unique/ignore_versions new file mode 100644 index 000000000..cf02abe24 --- /dev/null +++ b/localtests/generated-columns-unique/ignore_versions @@ -0,0 +1 @@ +Percona \ No newline at end of file diff --git a/localtests/generated-columns/ignore_versions b/localtests/generated-columns/ignore_versions new file mode 100644 index 000000000..cf02abe24 --- /dev/null +++ b/localtests/generated-columns/ignore_versions @@ -0,0 +1 @@ +Percona \ No newline at end of file diff --git a/localtests/geometry/ignore_versions b/localtests/geometry/ignore_versions new file mode 100644 index 000000000..cf02abe24 --- /dev/null +++ b/localtests/geometry/ignore_versions @@ -0,0 +1 @@ +Percona \ No newline at end of file diff --git a/localtests/spatial/ignore_versions b/localtests/spatial/ignore_versions new file mode 100644 index 000000000..cf02abe24 --- /dev/null +++ b/localtests/spatial/ignore_versions @@ -0,0 +1 @@ +Percona \ No newline at end of file diff --git a/localtests/test.sh b/localtests/test.sh index f254a5352..36ae3a7b9 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -11,6 +11,7 @@ tests_path=$(dirname $0) test_logfile=/tmp/gh-ost-test.log default_ghost_binary=/tmp/gh-ost-test ghost_binary="" +storage_engine=innodb exec_command_file=/tmp/gh-ost-test.bash ghost_structure_output_file=/tmp/gh-ost-test.ghost.structure.sql orig_content_output_file=/tmp/gh-ost-test.orig.content.csv @@ -24,12 +25,13 @@ replica_port= original_sql_mode= OPTIND=1 -while getopts "b:" OPTION +while getopts "b:s:" OPTION do case $OPTION in b) - ghost_binary="$OPTARG" - ;; + ghost_binary="$OPTARG";; + s) + storage_engine="$OPTARG";; esac done shift $((OPTIND-1)) @@ -99,9 +101,13 @@ test_single() { if [ -f $tests_path/$test_name/ignore_versions ] ; then ignore_versions=$(cat $tests_path/$test_name/ignore_versions) mysql_version=$(gh-ost-test-mysql-master -s -s -e "select @@version") + mysql_version_comment=$(gh-ost-test-mysql-master -s -s -e "select @@version_comment") if echo "$mysql_version" | egrep -q "^${ignore_versions}" ; then echo -n "Skipping: $test_name" return 0 + elif echo "$mysql_version_comment" | egrep -i -q "^${ignore_versions}" ; then + echo -n "Skipping: $test_name" + return 0 fi fi @@ -117,6 +123,14 @@ test_single() { fi gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/create.sql + test_create_result=$? + + if [ $test_create_result -ne 0 ] ; then + echo + echo "ERROR $test_name create failure. cat $tests_path/$test_name/create.sql:" + cat $tests_path/$test_name/create.sql + return 1 + fi extra_args="" if [ -f $tests_path/$test_name/extra_args ] ; then @@ -146,7 +160,8 @@ test_single() { --assume-master-host=${master_host}:${master_port} --database=test \ --table=gh_ost_test \ - --alter='engine=innodb' \ + --storage-engine=${storage_engine} \ + --alter='engine=${storage_engine}' \ --exact-rowcount \ --assume-rbr \ --initially-drop-old-table \ @@ -255,7 +270,7 @@ build_binary() { test_all() { build_binary - find $tests_path ! -path . -type d -mindepth 1 -maxdepth 1 | cut -d "/" -f 3 | egrep "$test_pattern" | while read test_name ; do + find $tests_path ! -path . -type d -mindepth 1 -maxdepth 1 | cut -d "/" -f 3 | egrep "$test_pattern" | sort | while read test_name ; do test_single "$test_name" if [ $? -ne 0 ] ; then create_statement=$(gh-ost-test-mysql-replica test -t -e "show create table \`~gh_ost_test_gho\` \G") diff --git a/script/cibuild-gh-ost-replica-tests b/script/cibuild-gh-ost-replica-tests index c4dbfd292..90eb856bc 100755 --- a/script/cibuild-gh-ost-replica-tests +++ b/script/cibuild-gh-ost-replica-tests @@ -36,8 +36,16 @@ test_mysql_version() { mkdir -p sandbox/binary rm -rf sandbox/binary/* - gh-ost-ci-env/bin/linux/dbdeployer unpack gh-ost-ci-env/mysql-tarballs/"$mysql_version".tar.xz --sandbox-binary ${PWD}/sandbox/binary - + local mysql_server=${mysql_version%-*} + if echo "$mysql_server" | egrep -i "percona" ; then + tarball_name=Percona-Server-${mysql_version#*-}-12-Linux.x86_64.glibc2.12-minimal.tar.gz + rm -f gh-ost-ci-env/mysql-tarballs/${tarball_name} + ln -s "$mysql_version".tar.xz gh-ost-ci-env/mysql-tarballs/${tarball_name} + gh-ost-ci-env/bin/linux/dbdeployer unpack gh-ost-ci-env/mysql-tarballs/${tarball_name} --sandbox-binary ${PWD}/sandbox/binary + rm -f gh-ost-ci-env/mysql-tarballs/${tarball_name} + else + gh-ost-ci-env/bin/linux/dbdeployer unpack gh-ost-ci-env/mysql-tarballs/"$mysql_version".tar.xz --sandbox-binary ${PWD}/sandbox/binary + fi mkdir -p sandboxes rm -rf sandboxes/* @@ -60,9 +68,21 @@ test_mysql_version() { gh-ost-test-mysql-master -uroot -e "create user 'gh-ost'@'%' identified by 'gh-ost'" gh-ost-test-mysql-master -uroot -e "grant all on *.* to 'gh-ost'@'%'" - echo "### Running gh-ost tests for $mysql_version" - ./localtests/test.sh -b bin/gh-ost - + if echo "$mysql_server" | egrep -i "percona" ; then + echo "### Preparing for rocksdb in PerconaServer" + gh-ost-test-mysql-master -uroot -e 'INSTALL PLUGIN ROCKSDB SONAME "ha_rocksdb.so"' + gh-ost-test-mysql-master -uroot -e 'set global default_storage_engine="ROCKSDB"' + gh-ost-test-mysql-master -uroot -e 'set global transaction_isolation="READ-COMMITTED"' + gh-ost-test-mysql-replica -uroot -e 'INSTALL PLUGIN ROCKSDB SONAME "ha_rocksdb.so"' + gh-ost-test-mysql-replica -uroot -e 'set global default_storage_engine="ROCKSDB"' + gh-ost-test-mysql-replica -uroot -e 'set global transaction_isolation="READ-COMMITTED"' + + echo "### Running gh-ost tests for $mysql_version" + ./localtests/test.sh -b bin/gh-ost -s rocksdb + else + echo "### Running gh-ost tests for $mysql_version" + ./localtests/test.sh -b bin/gh-ost -s innodb + fi find sandboxes -name "stop_all" | bash } diff --git a/script/ensure-golangci-lint-installed b/script/ensure-golangci-lint-installed new file mode 100755 index 000000000..e4f49f539 --- /dev/null +++ b/script/ensure-golangci-lint-installed @@ -0,0 +1,17 @@ +#!/bin/bash + +# See https://github.com/golangci/golangci-lint/releases +GOLANGCI_RELEASE=v1.46.2 +GOLANGCI_INSTALL_SCRIPT=https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh + +if [ -z "$GOPATH" ]; then + echo "GOPATH must be set" + exit 1 +fi + +if [ ! -x "$GOPATH/bin/golangci-lint" ]; then + echo "Installing golangci-lint $GOLANGCI_RELEASE using script: $GOLANGCI_INSTALL_SCRIPT" + curl -sSfL $GOLANGCI_INSTALL_SCRIPT | sh -s -- -b $(go env GOPATH)/bin $GOLANGCI_RELEASE +fi + +$GOPATH/bin/golangci-lint --version diff --git a/script/lint b/script/lint new file mode 100755 index 000000000..e29aa8be5 --- /dev/null +++ b/script/lint @@ -0,0 +1,15 @@ +#!/bin/bash + +set -e + +. script/ensure-go-installed +. script/ensure-golangci-lint-installed + +if [ -x "$GOPATH/bin/golangci-lint" ]; then + echo "Running golangci-lint run" + $GOPATH/bin/golangci-lint run --config=.golangci.yml + echo "Done, exit code: $?" +else + echo "ERROR: cannot find golangci-lint at $GOPATH/bin" + exit 1 +fi diff --git a/script/test b/script/test index 7e757b516..5c32b370c 100755 --- a/script/test +++ b/script/test @@ -14,4 +14,4 @@ script/build cd .gopath/src/github.com/github/gh-ost echo "Running unit tests" -go test ./go/... +go test -v -covermode=atomic ./go/...