From a49c4d1cc40774184cecf85412232dd4aaa3b1ea Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 20 Apr 2021 18:31:14 +0800 Subject: [PATCH 01/11] support tidb region dumping except partitions --- v4/export/dump.go | 101 ++++++++++++++++++++++++++++++++++++++---- v4/export/sql_type.go | 51 +++++++++++---------- 2 files changed, 121 insertions(+), 31 deletions(-) diff --git a/v4/export/dump.go b/v4/export/dump.go index e3d72749..5cb0fab2 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -441,10 +441,8 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met db, tbl := meta.DatabaseName(), meta.TableName() if conf.ServerInfo.ServerType == ServerTypeTiDB && conf.ServerInfo.ServerVersion != nil && - conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 { - tctx.L().Debug("dumping TiDB tables with TABLESAMPLE", - zap.String("database", db), zap.String("table", tbl)) - return d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan) + conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) >= 0 { + return d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan, conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0) } field, err := pickupPossibleField(db, tbl, conn, conf) if err != nil { @@ -571,11 +569,24 @@ func (d *Dumper) selectMinAndMaxIntValue(conn *sql.Conn, db, tbl, field string) return min, max, nil } -func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task) error { +func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, useTableSample bool) error { conf := d.conf db, tbl := meta.DatabaseName(), meta.TableName() - handleColNames, handleVals, err := selectTiDBTableSample(conn, db, tbl) + var ( + handleColNames []string + handleVals [][]string + err error + ) + if useTableSample { + tctx.L().Debug("dumping TiDB tables with TABLESAMPLE", + zap.String("database", db), zap.String("table", tbl)) + handleColNames, handleVals, err = selectTiDBTableSample(tctx, conn, db, tbl) + } else { + tctx.L().Debug("dumping TiDB tables with TABLE REGIONS", + zap.String("database", db), zap.String("table", tbl)) + handleColNames, handleVals, err = selectTiDBTableRegion(tctx, conn, db, tbl) + } if err != nil { return err } @@ -605,7 +616,7 @@ func (d *Dumper) L() log.Logger { return d.tctx.L() } -func selectTiDBTableSample(conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { +func selectTiDBTableSample(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { pkFields, pkColTypes, err := GetPrimaryKeyAndColumnTypes(conn, dbName, tableName) if err != nil { return nil, nil, errors.Trace(err) @@ -621,7 +632,7 @@ func selectTiDBTableSample(conn *sql.Conn, dbName, tableName string) (pkFields [ return pkFields, pkVals, nil } query := buildTiDBTableSampleQuery(pkFields, dbName, tableName) - rows, err := conn.QueryContext(context.Background(), query) + rows, err := conn.QueryContext(tctx, query) if err != nil { return nil, nil, errors.Trace(err) } @@ -658,6 +669,80 @@ func buildTiDBTableSampleQuery(pkFields []string, dbName, tblName string) string return fmt.Sprintf(template, pks, escapeString(dbName), escapeString(tblName), pks) } +func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { + var pkColTypes []string + hasImplicitRowID, err := SelectTiDBRowID(conn, dbName, tableName) + if err != nil { + return nil, nil, errors.Trace(err) + } + if hasImplicitRowID { + pkFields, pkColTypes = []string{"_tidb_rowid"}, []string{"BIGINT"} + } else { + pkFields, pkColTypes, err = GetPrimaryKeyAndColumnTypes(conn, dbName, tableName) + if err != nil { + return nil, nil, errors.Trace(err) + } + if len(pkFields) != 1 || len(pkColTypes) != 1 { + return nil, nil, errors.Errorf("unsupported primary key for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", ")) + } + if _, ok := dataTypeNum[pkColTypes[0]]; !ok { + return nil, nil, errors.Errorf("unsupported primary key type for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", ")) + } + } + if len(pkFields) == 0 { + return pkFields, pkVals, nil + } + + rows, err := conn.QueryContext(tctx, "SELECT START_KEY,tidb_decode_key(START_KEY) from INFORMATION_SCHEMA.TIKV_REGION_STATUS s WHERE s.DB_NAME = ? AND s.TABLE_NAME = ? AND IS_INDEX = 0 ORDER BY START_KEY;", dbName, tableName) + if err != nil { + return + } + defer rows.Close() + var ( + startKey, decodedKey sql.NullString + rowID int + ) + logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName)) + for rows.Next() { + rowID++ + err = rows.Scan(&startKey, &decodedKey) + if err != nil { + return + } + // first region's start key has no use. It may come from another table or might be invalid + if rowID == 1 { + continue + } + if !startKey.Valid { + logger.Debug("meet invalid start key", zap.Int("rowID", rowID)) + continue + } + if !decodedKey.Valid { + logger.Debug("meet invalid decoded start key", zap.Int("rowID", rowID), zap.String("startKey", startKey.String)) + continue + } + pkVal, err := getTiDBRowIDFromDecodedKey(decodedKey.String) + if err != nil { + logger.Debug("fail to extract pkVal from decoded start key", + zap.Int("rowID", rowID), zap.String("startKey", startKey.String), zap.String("decodedKey", decodedKey.String)) + } else { + pkVals = append(pkVals, pkVal) + } + } + return +} + +func getTiDBRowIDFromDecodedKey(key string) ([]string, error) { + const ( + tidbRowID = "_tidb_rowid=" + ) + if p := strings.Index(key, tidbRowID); p != -1 { + p += len(tidbRowID) + return []string{key[p:]}, nil + } + return nil, errors.Errorf("decoded key %s doesn't have _tidb_rowid= field", key) +} + func prepareTableListToDump(tctx *tcontext.Context, conf *Config, db *sql.Conn) error { databases, err := prepareDumpingDatabases(conf, db) if err != nil { diff --git a/v4/export/sql_type.go b/v4/export/sql_type.go index 6e117f08..db874fa2 100644 --- a/v4/export/sql_type.go +++ b/v4/export/sql_type.go @@ -18,37 +18,42 @@ var ( ) func initColTypeRowReceiverMap() { - for _, s := range dataTypeString { + var dataTypeStringArr = []string{ + "CHAR", "NCHAR", "VARCHAR", "NVARCHAR", "CHARACTER", "VARCHARACTER", + "TIMESTAMP", "DATETIME", "DATE", "TIME", "YEAR", "SQL_TSI_YEAR", + "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", + "ENUM", "SET", "JSON", + } + + var dataTypeNumArr = []string{ + "INTEGER", "BIGINT", "TINYINT", "SMALLINT", "MEDIUMINT", + "INT", "INT1", "INT2", "INT3", "INT8", + "FLOAT", "REAL", "DOUBLE", "DOUBLE PRECISION", + "DECIMAL", "NUMERIC", "FIXED", + "BOOL", "BOOLEAN", + } + + var dataTypeBinArr = []string{ + "BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "LONG", + "BINARY", "VARBINARY", + "BIT", + } + + for _, s := range dataTypeStringArr { + dataTypeString[s] = struct{}{} colTypeRowReceiverMap[s] = SQLTypeStringMaker } - for _, s := range dataTypeNum { + for _, s := range dataTypeNumArr { + dataTypeNum[s] = struct{}{} colTypeRowReceiverMap[s] = SQLTypeNumberMaker } - for _, s := range dataTypeBin { + for _, s := range dataTypeBinArr { + dataTypeBin[s] = struct{}{} colTypeRowReceiverMap[s] = SQLTypeBytesMaker } } -var dataTypeString = []string{ - "CHAR", "NCHAR", "VARCHAR", "NVARCHAR", "CHARACTER", "VARCHARACTER", - "TIMESTAMP", "DATETIME", "DATE", "TIME", "YEAR", "SQL_TSI_YEAR", - "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", - "ENUM", "SET", "JSON", -} - -var dataTypeNum = []string{ - "INTEGER", "BIGINT", "TINYINT", "SMALLINT", "MEDIUMINT", - "INT", "INT1", "INT2", "INT3", "INT8", - "FLOAT", "REAL", "DOUBLE", "DOUBLE PRECISION", - "DECIMAL", "NUMERIC", "FIXED", - "BOOL", "BOOLEAN", -} - -var dataTypeBin = []string{ - "BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "LONG", - "BINARY", "VARBINARY", - "BIT", -} +var dataTypeString, dataTypeNum, dataTypeBin = make(map[string]struct{}), make(map[string]struct{}), make(map[string]struct{}) func escapeBackslashSQL(s []byte, bf *bytes.Buffer) { var ( From acba88b73bc7debf94cf40f484acb2972fd7845c Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 22 Apr 2021 15:14:52 +0800 Subject: [PATCH 02/11] support split regions for tables with partitions --- v4/export/dump.go | 191 ++++++++++++++++++++++++++++++------------ v4/export/sql.go | 44 +++++++--- v4/export/sql_test.go | 2 +- 3 files changed, 170 insertions(+), 67 deletions(-) diff --git a/v4/export/dump.go b/v4/export/dump.go index 5cb0fab2..504fb021 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -400,9 +400,9 @@ func (d *Dumper) buildConcatTask(tctx *tcontext.Context, conn *sql.Conn, meta Ta } } -func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task) error { +func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, partition string) error { conf := d.conf - tableIR, err := SelectAllFromTable(conf, conn, meta) + tableIR, err := SelectAllFromTable(conf, conn, meta, partition) if err != nil { return err } @@ -432,7 +432,7 @@ func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *sql.Conn, met zap.String("database", meta.DatabaseName()), zap.String("table", meta.TableName())) } - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan) + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "") } // concurrentDumpTable tries to split table into several chunks to dump @@ -452,7 +452,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met // skip split chunk logic if not found proper field tctx.L().Warn("fallback to sequential dump due to no proper field", zap.String("database", db), zap.String("table", tbl)) - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan) + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "") } min, max, err := d.selectMinAndMaxIntValue(conn, db, tbl, field) @@ -475,7 +475,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met zap.Uint64("conf.rows", conf.Rows), zap.String("database", db), zap.String("table", tbl)) - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan) + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "") } // every chunk would have eventual adjustments @@ -506,7 +506,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met for max.Cmp(cutoff) >= 0 { nextCutOff := new(big.Int).Add(cutoff, bigEstimatedStep) where := fmt.Sprintf("%s(`%s` >= %d AND `%s` < %d)", nullValueCondition, escapeString(field), cutoff, escapeString(field), nextCutOff) - query := buildSelectQuery(db, tbl, selectField, buildWhereCondition(conf, where), orderByClause) + query := buildSelectQuery(db, tbl, selectField, "", buildWhereCondition(conf, where), orderByClause) if len(nullValueCondition) > 0 { nullValueCondition = "" } @@ -570,7 +570,6 @@ func (d *Dumper) selectMinAndMaxIntValue(conn *sql.Conn, db, tbl, field string) } func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, useTableSample bool) error { - conf := d.conf db, tbl := meta.DatabaseName(), meta.TableName() var ( @@ -585,14 +584,61 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn } else { tctx.L().Debug("dumping TiDB tables with TABLE REGIONS", zap.String("database", db), zap.String("table", tbl)) - handleColNames, handleVals, err = selectTiDBTableRegion(tctx, conn, db, tbl) + partitions, err := GetPartitionNames(conn, db, tbl) + if err == nil { + if len(partitions) == 0 { + handleColNames, handleVals, err = selectTiDBTableRegion(tctx, conn, db, tbl) + } else { + return d.concurrentDumpTiDBPartitionTables(tctx, conn, meta, taskChan, partitions) + } + } } if err != nil { return err } + return d.sendConcurrentDumpTiDBTasks(tctx, conn, meta, taskChan, handleColNames, handleVals, "", 0, len(handleVals)+1) +} + +func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, partitions []string) error { + db, tbl := meta.DatabaseName(), meta.TableName() + tctx.L().Debug("dumping TiDB tables with TABLE REGIONS for partition table", + zap.String("database", db), zap.String("table", tbl), zap.Strings("partitions", partitions)) + + startChunkIdx := 0 + totalChunk := 0 + cachedHandleVals := make([][][]string, len(partitions)) + + handleColNames, _, err := selectTiDBPrimaryKeyFields(conn, db, tbl, checkTiDBTableRegionPkFields) + if err != nil { + return err + } + // cache handleVals here to calculate the total chunks + for i, partition := range partitions { + handleVals, err := selectTiDBPartitionRegion(tctx, conn, db, tbl, partition) + if err != nil { + return err + } + totalChunk += len(handleVals) + 1 + cachedHandleVals[i] = handleVals + } + for i, partition := range partitions { + err := d.sendConcurrentDumpTiDBTasks(tctx, conn, meta, taskChan, handleColNames, cachedHandleVals[i], partition, startChunkIdx, totalChunk) + if err != nil { + return err + } + startChunkIdx += len(cachedHandleVals[i]) + 1 + } + return nil +} + +func (d *Dumper) sendConcurrentDumpTiDBTasks(tctx *tcontext.Context, + conn *sql.Conn, meta TableMeta, taskChan chan<- Task, + handleColNames []string, handleVals [][]string, partition string, startChunkIdx, totalChunk int) error { if len(handleVals) == 0 { - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan) + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, partition) } + conf := d.conf + db, tbl := meta.DatabaseName(), meta.TableName() selectField, selectLen, err := buildSelectField(conn, db, tbl, conf.CompleteInsert) if err != nil { return err @@ -601,8 +647,8 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn orderByClause := buildOrderByClauseString(handleColNames) for i, w := range where { - query := buildSelectQuery(db, tbl, selectField, buildWhereCondition(conf, w), orderByClause) - task := NewTaskTableData(meta, newTableData(query, selectLen, false), i, len(where)) + query := buildSelectQuery(db, tbl, selectField, partition, buildWhereCondition(conf, w), orderByClause) + task := NewTaskTableData(meta, newTableData(query, selectLen, false), i+startChunkIdx, totalChunk) ctxDone := d.sendTaskToChan(tctx, task, taskChan) if ctxDone { return tctx.Err() @@ -617,20 +663,11 @@ func (d *Dumper) L() log.Logger { } func selectTiDBTableSample(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { - pkFields, pkColTypes, err := GetPrimaryKeyAndColumnTypes(conn, dbName, tableName) - if err != nil { - return nil, nil, errors.Trace(err) - } - hasImplicitRowID, err := SelectTiDBRowID(conn, dbName, tableName) + pkFields, pkColTypes, err := selectTiDBPrimaryKeyFields(conn, dbName, tableName, nil) if err != nil { return nil, nil, errors.Trace(err) } - if hasImplicitRowID { - pkFields, pkColTypes = []string{"_tidb_rowid"}, []string{"BIGINT"} - } - if len(pkFields) == 0 { - return pkFields, pkVals, nil - } + query := buildTiDBTableSampleQuery(pkFields, dbName, tableName) rows, err := conn.QueryContext(tctx, query) if err != nil { @@ -656,7 +693,8 @@ func selectTiDBTableSample(tctx *tcontext.Context, conn *sql.Conn, dbName, table pkVals = append(pkVals, pkValRow) iter.Next() } - return pkFields, pkVals, nil + iter.Close() + return pkFields, pkVals, iter.Error() } func buildTiDBTableSampleQuery(pkFields []string, dbName, tblName string) string { @@ -669,78 +707,121 @@ func buildTiDBTableSampleQuery(pkFields []string, dbName, tblName string) string return fmt.Sprintf(template, pks, escapeString(dbName), escapeString(tblName), pks) } -func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { - var pkColTypes []string +func selectTiDBPrimaryKeyFields(conn *sql.Conn, dbName, tableName string, checkPkFields func([]string, []string) error) (pkFields, pkColTypes []string, err error) { hasImplicitRowID, err := SelectTiDBRowID(conn, dbName, tableName) if err != nil { - return nil, nil, errors.Trace(err) + return } if hasImplicitRowID { pkFields, pkColTypes = []string{"_tidb_rowid"}, []string{"BIGINT"} } else { pkFields, pkColTypes, err = GetPrimaryKeyAndColumnTypes(conn, dbName, tableName) - if err != nil { - return nil, nil, errors.Trace(err) - } - if len(pkFields) != 1 || len(pkColTypes) != 1 { - return nil, nil, errors.Errorf("unsupported primary key for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", ")) - } - if _, ok := dataTypeNum[pkColTypes[0]]; !ok { - return nil, nil, errors.Errorf("unsupported primary key type for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", ")) + if err == nil { + if checkPkFields != nil { + err = checkPkFields(pkFields, pkColTypes) + } } } - if len(pkFields) == 0 { - return pkFields, pkVals, nil + return +} + +func checkTiDBTableRegionPkFields(pkFields, pkColTypes []string) (err error) { + if len(pkFields) != 1 || len(pkColTypes) != 1 { + err = errors.Errorf("unsupported primary key for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", ")) + return + } + if _, ok := dataTypeNum[pkColTypes[0]]; !ok { + err = errors.Errorf("unsupported primary key type for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", ")) } + return +} - rows, err := conn.QueryContext(tctx, "SELECT START_KEY,tidb_decode_key(START_KEY) from INFORMATION_SCHEMA.TIKV_REGION_STATUS s WHERE s.DB_NAME = ? AND s.TABLE_NAME = ? AND IS_INDEX = 0 ORDER BY START_KEY;", dbName, tableName) +func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { + pkFields, _, err = selectTiDBPrimaryKeyFields(conn, dbName, tableName, checkTiDBTableRegionPkFields) if err != nil { return } - defer rows.Close() + var ( startKey, decodedKey sql.NullString - rowID int + rowID = -1 + ) + const ( + tableRegionSql = "SELECT START_KEY,tidb_decode_key(START_KEY) from INFORMATION_SCHEMA.TIKV_REGION_STATUS s WHERE s.DB_NAME = ? AND s.TABLE_NAME = ? AND IS_INDEX = 0 ORDER BY START_KEY;" + tidbRowID = "_tidb_rowid=" ) logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName)) - for rows.Next() { + err = simpleQueryWithArgs(conn, func(rows *sql.Rows) error { rowID++ err = rows.Scan(&startKey, &decodedKey) if err != nil { - return + return errors.Trace(err) } // first region's start key has no use. It may come from another table or might be invalid - if rowID == 1 { - continue + if rowID == 0 { + return nil } if !startKey.Valid { logger.Debug("meet invalid start key", zap.Int("rowID", rowID)) - continue + return nil } if !decodedKey.Valid { logger.Debug("meet invalid decoded start key", zap.Int("rowID", rowID), zap.String("startKey", startKey.String)) - continue + return nil } - pkVal, err := getTiDBRowIDFromDecodedKey(decodedKey.String) + pkVal, err := extractTiDBRowIDFromDecodedKey(tidbRowID, decodedKey.String) if err != nil { logger.Debug("fail to extract pkVal from decoded start key", - zap.Int("rowID", rowID), zap.String("startKey", startKey.String), zap.String("decodedKey", decodedKey.String)) + zap.Int("rowID", rowID), zap.String("startKey", startKey.String), zap.String("decodedKey", decodedKey.String), zap.Error(err)) } else { - pkVals = append(pkVals, pkVal) + pkVals = append(pkVals, []string{pkVal}) } - } + return nil + }, tableRegionSql, dbName, tableName) return } -func getTiDBRowIDFromDecodedKey(key string) ([]string, error) { +func selectTiDBPartitionRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName, partition string) (pkVals [][]string, err error) { + var ( + rows *sql.Rows + startKeys []string + ) const ( - tidbRowID = "_tidb_rowid=" + partitionRegionSql = "SHOW TABLE `%s`.`%s` PARTITION (`%s`) REGIONS" + regionRowKey = "r_" ) - if p := strings.Index(key, tidbRowID); p != -1 { - p += len(tidbRowID) - return []string{key[p:]}, nil + logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName), zap.String("partition", partition)) + rows, err = conn.QueryContext(tctx, fmt.Sprintf(partitionRegionSql, dbName, tableName, partition)) + if err != nil { + err = errors.Trace(err) + return + } + startKeys, err = GetSpecifiedColumnValue(rows, "START_KEY") + if err != nil { + return + } + for rowID, startKey := range startKeys { + if rowID == 0 { + continue + } + pkVal, err := extractTiDBRowIDFromDecodedKey(regionRowKey, startKey) + if err != nil { + logger.Debug("show table region start key doesn't have rowID", + zap.Int("rowID", rowID), zap.String("startKey", startKey), zap.Error(err)) + } else { + pkVals = append(pkVals, []string{pkVal}) + } + } + + return +} + +func extractTiDBRowIDFromDecodedKey(indexField, key string) (string, error) { + if p := strings.Index(key, indexField); p != -1 { + p += len(indexField) + return key[p:], nil } - return nil, errors.Errorf("decoded key %s doesn't have _tidb_rowid= field", key) + return "", errors.Errorf("decoded key %s doesn't have %s field", key, indexField) } func prepareTableListToDump(tctx *tcontext.Context, conf *Config, db *sql.Conn) error { diff --git a/v4/export/sql.go b/v4/export/sql.go index 2e160d8b..68605698 100644 --- a/v4/export/sql.go +++ b/v4/export/sql.go @@ -186,7 +186,7 @@ func SelectVersion(db *sql.DB) (string, error) { } // SelectAllFromTable dumps data serialized from a specified table -func SelectAllFromTable(conf *Config, db *sql.Conn, meta TableMeta) (TableDataIR, error) { +func SelectAllFromTable(conf *Config, db *sql.Conn, meta TableMeta, partition string) (TableDataIR, error) { database, table := meta.DatabaseName(), meta.TableName() selectedField, selectLen, err := buildSelectField(db, database, table, conf.CompleteInsert) if err != nil { @@ -197,7 +197,7 @@ func SelectAllFromTable(conf *Config, db *sql.Conn, meta TableMeta) (TableDataIR if err != nil { return nil, err } - query := buildSelectQuery(database, table, selectedField, buildWhereCondition(conf, ""), orderByClause) + query := buildSelectQuery(database, table, selectedField, partition, buildWhereCondition(conf, ""), orderByClause) return &tableData{ query: query, @@ -205,7 +205,7 @@ func SelectAllFromTable(conf *Config, db *sql.Conn, meta TableMeta) (TableDataIR }, nil } -func buildSelectQuery(database, table string, fields string, where string, orderByClause string) string { +func buildSelectQuery(database, table, fields, partition, where, orderByClause string) string { var query strings.Builder query.WriteString("SELECT ") if fields == "" { @@ -218,7 +218,12 @@ func buildSelectQuery(database, table string, fields string, where string, order query.WriteString(escapeString(database)) query.WriteString("`.`") query.WriteString(escapeString(table)) - query.WriteString("`") + query.WriteByte('`') + if partition != "" { + query.WriteString(" PARTITION(`") + query.WriteString(escapeString(partition)) + query.WriteString("`)") + } if where != "" { query.WriteString(" ") @@ -435,6 +440,7 @@ func ShowMasterStatus(db *sql.Conn) ([]string, error) { // GetSpecifiedColumnValue get columns' values whose name is equal to columnName func GetSpecifiedColumnValue(rows *sql.Rows, columnName string) ([]string, error) { + defer rows.Close() var strs []string columns, _ := rows.Columns() addr := make([]interface{}, len(columns)) @@ -458,7 +464,8 @@ func GetSpecifiedColumnValue(rows *sql.Rows, columnName string) ([]string, error strs = append(strs, oneRow[fieldIndex].String) } } - return strs, nil + rows.Close() + return strs, errors.Trace(rows.Err()) } // GetPdAddrs gets PD address from TiDB @@ -470,7 +477,6 @@ func GetPdAddrs(tctx *tcontext.Context, db *sql.DB) ([]string, error) { zap.String("query", query), zap.Error(err)) return []string{}, errors.Annotatef(err, "sql: %s", query) } - defer rows.Close() return GetSpecifiedColumnValue(rows, "STATUS_ADDRESS") } @@ -483,7 +489,6 @@ func GetTiDBDDLIDs(tctx *tcontext.Context, db *sql.DB) ([]string, error) { zap.String("query", query), zap.Error(err)) return []string{}, errors.Annotatef(err, "sql: %s", query) } - defer rows.Close() return GetSpecifiedColumnValue(rows, "DDL_ID") } @@ -817,7 +822,7 @@ func simpleQueryWithArgs(conn *sql.Conn, handleOneRow func(*sql.Rows) error, sql } } rows.Close() - return rows.Err() + return errors.Annotatef(rows.Err(), "sql: %s", sql) } func pickupPossibleField(dbName, tableName string, db *sql.Conn, conf *Config) (string, error) { @@ -964,14 +969,14 @@ func buildWhereCondition(conf *Config, where string) string { var query strings.Builder separator := "WHERE" if conf.Where != "" { - query.WriteString(" ") query.WriteString(separator) - query.WriteString(" ") + query.WriteByte(' ') query.WriteString(conf.Where) + query.WriteByte(' ') separator = "AND" + query.WriteByte(' ') } if where != "" { - query.WriteString(" ") query.WriteString(separator) query.WriteString(" ") query.WriteString(where) @@ -982,3 +987,20 @@ func buildWhereCondition(conf *Config, where string) string { func escapeString(s string) string { return strings.ReplaceAll(s, "`", "``") } + +// GetPartitionNames get partition names from a specified table +func GetPartitionNames(db *sql.Conn, schema, table string) (partitions []string, err error) { + partitions = make([]string, 0) + var partitionName sql.NullString + err = simpleQueryWithArgs(db, func(rows *sql.Rows) error { + err := rows.Scan(&partitionName) + if err != nil { + return errors.Trace(err) + } + if partitionName.Valid { + partitions = append(partitions, partitionName.String) + } + return nil + }, "SELECT PARTITION_NAME from INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?", schema, table) + return +} diff --git a/v4/export/sql_test.go b/v4/export/sql_test.go index e346fa15..a2119303 100644 --- a/v4/export/sql_test.go +++ b/v4/export/sql_test.go @@ -653,7 +653,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { orderByClause := buildOrderByClauseString(handleColNames) for i, w := range testCase.expectedWhereClauses { - query := buildSelectQuery(database, table, "*", buildWhereCondition(d.conf, w), orderByClause) + query := buildSelectQuery(database, table, "*", "", buildWhereCondition(d.conf, w), orderByClause) task := <-taskChan taskTableData, ok := task.(*TaskTableData) c.Assert(ok, IsTrue) From 0d80f1075ff425e3a31ed1d815405e3584c55ace Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 23 Apr 2021 19:04:08 +0800 Subject: [PATCH 03/11] add unit tests --- v4/export/dump.go | 14 +- v4/export/sql.go | 2 +- v4/export/sql_test.go | 540 ++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 530 insertions(+), 26 deletions(-) diff --git a/v4/export/dump.go b/v4/export/dump.go index 504fb021..eda74d77 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -400,13 +400,13 @@ func (d *Dumper) buildConcatTask(tctx *tcontext.Context, conn *sql.Conn, meta Ta } } -func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, partition string) error { +func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, partition string, currentChunk, totalChunks int) error { conf := d.conf tableIR, err := SelectAllFromTable(conf, conn, meta, partition) if err != nil { return err } - task := NewTaskTableData(meta, tableIR, 0, 1) + task := NewTaskTableData(meta, tableIR, currentChunk, totalChunks) ctxDone := d.sendTaskToChan(tctx, task, taskChan) if ctxDone { return tctx.Err() @@ -432,7 +432,7 @@ func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *sql.Conn, met zap.String("database", meta.DatabaseName()), zap.String("table", meta.TableName())) } - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "") + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1) } // concurrentDumpTable tries to split table into several chunks to dump @@ -452,7 +452,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met // skip split chunk logic if not found proper field tctx.L().Warn("fallback to sequential dump due to no proper field", zap.String("database", db), zap.String("table", tbl)) - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "") + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1) } min, max, err := d.selectMinAndMaxIntValue(conn, db, tbl, field) @@ -475,7 +475,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met zap.Uint64("conf.rows", conf.Rows), zap.String("database", db), zap.String("table", tbl)) - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "") + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1) } // every chunk would have eventual adjustments @@ -635,7 +635,7 @@ func (d *Dumper) sendConcurrentDumpTiDBTasks(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, handleColNames []string, handleVals [][]string, partition string, startChunkIdx, totalChunk int) error { if len(handleVals) == 0 { - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, partition) + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, partition, startChunkIdx, totalChunk) } conf := d.conf db, tbl := meta.DatabaseName(), meta.TableName() @@ -787,7 +787,7 @@ func selectTiDBPartitionRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, t startKeys []string ) const ( - partitionRegionSql = "SHOW TABLE `%s`.`%s` PARTITION (`%s`) REGIONS" + partitionRegionSql = "SHOW TABLE `%s`.`%s` PARTITION(`%s`) REGIONS" regionRowKey = "r_" ) logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName), zap.String("partition", partition)) diff --git a/v4/export/sql.go b/v4/export/sql.go index 68605698..3c9be946 100644 --- a/v4/export/sql.go +++ b/v4/export/sql.go @@ -620,7 +620,7 @@ func buildSelectField(db *sql.Conn, dbName, tableName string, completeInsert boo } func buildWhereClauses(handleColNames []string, handleVals [][]string) []string { - if len(handleColNames) == 0 { + if len(handleColNames) == 0 || len(handleVals) == 0 { return nil } quotaCols := make([]string, len(handleColNames)) diff --git a/v4/export/sql_test.go b/v4/export/sql_test.go index a2119303..a9188b5a 100644 --- a/v4/export/sql_test.go +++ b/v4/export/sql_test.go @@ -87,7 +87,7 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { selectedField, _, err := buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q := buildSelectQuery("test", "t", selectedField, "", orderByClause) + q := buildSelectQuery("test", "t", selectedField, "", "", orderByClause) c.Assert(q, Equals, "SELECT * FROM `test`.`t` ORDER BY `_tidb_rowid`") // _tidb_rowid is unavailable, or PKIsHandle. @@ -107,7 +107,7 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { selectedField, _, err = buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q = buildSelectQuery("test", "t", selectedField, "", orderByClause) + q = buildSelectQuery("test", "t", selectedField, "", "", orderByClause) c.Assert(q, Equals, "SELECT * FROM `test`.`t` ORDER BY `id`") c.Assert(mock.ExpectationsWereMet(), IsNil) @@ -130,7 +130,7 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { selectedField, _, err = buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q = buildSelectQuery("test", "t", selectedField, "", orderByClause) + q = buildSelectQuery("test", "t", selectedField, "", "", orderByClause) c.Assert(q, Equals, "SELECT * FROM `test`.`t` ORDER BY `id`", cmt) err = mock.ExpectationsWereMet() c.Assert(err, IsNil, cmt) @@ -154,7 +154,7 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { selectedField, _, err = buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q := buildSelectQuery("test", "t", selectedField, "", orderByClause) + q := buildSelectQuery("test", "t", selectedField, "", "", orderByClause) c.Assert(q, Equals, "SELECT * FROM `test`.`t`", cmt) err = mock.ExpectationsWereMet() c.Assert(err, IsNil, cmt) @@ -173,7 +173,7 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { selectedField, _, err := buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q := buildSelectQuery("test", "t", selectedField, "", "") + q := buildSelectQuery("test", "t", selectedField, "", "", "") c.Assert(q, Equals, "SELECT * FROM `test`.`t`", cmt) c.Assert(mock.ExpectationsWereMet(), IsNil, cmt) } @@ -409,7 +409,7 @@ func (s *testSQLSuite) TestGetSuitableRows(c *C) { } } -func (s *testSQLSuite) TestBuildWhereClauses(c *C) { +func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) defer db.Close() @@ -434,18 +434,44 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { handleColTypes []string handleVals [][]driver.Value expectedWhereClauses []string + hasTiDBRowID bool }{ { []string{}, []string{}, [][]driver.Value{}, nil, + false, }, { []string{"a"}, []string{"bigint"}, [][]driver.Value{{1}}, []string{"`a`<1", "`a`>=1"}, + false, + }, + // check whether dumpling can turn to dump whole table + { + []string{"a"}, + []string{"bigint"}, + [][]driver.Value{}, + nil, + false, + }, + // check whether dumpling can turn to dump whole table + { + []string{"_tidb_rowid"}, + []string{"bigint"}, + [][]driver.Value{}, + nil, + true, + }, + { + []string{"_tidb_rowid"}, + []string{"bigint"}, + [][]driver.Value{{1}}, + []string{"`_tidb_rowid`<1", "`_tidb_rowid`>=1"}, + true, }, { []string{"a"}, @@ -456,12 +482,14 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { {3}, }, []string{"`a`<1", "`a`>=1 and `a`<2", "`a`>=2 and `a`<3", "`a`>=3"}, + false, }, { []string{"a", "b"}, []string{"bigint", "bigint"}, [][]driver.Value{{1, 2}}, []string{"`a`<1 or(`a`=1 and `b`<2)", "`a`>1 or(`a`=1 and `b`>=2)"}, + false, }, { []string{"a", "b"}, @@ -477,6 +505,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "(`a`>3 and `a`<5)or(`a`=3 and(`b`>=4))or(`a`=5 and(`b`<6))", "`a`>5 or(`a`=5 and `b`>=6)", }, + false, }, { []string{"a", "b", "c"}, @@ -490,6 +519,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "(`a`>1 and `a`<4)or(`a`=1 and(`b`>2 or(`b`=2 and `c`>=3)))or(`a`=4 and(`b`<5 or(`b`=5 and `c`<6)))", "`a`>4 or(`a`=4 and `b`>5)or(`a`=4 and `b`=5 and `c`>=6)", }, + false, }, { []string{"a", "b", "c"}, @@ -503,6 +533,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "`a`=1 and((`b`>2 and `b`<4)or(`b`=2 and(`c`>=3))or(`b`=4 and(`c`<5)))", "`a`>1 or(`a`=1 and `b`>4)or(`a`=1 and `b`=4 and `c`>=5)", }, + false, }, { []string{"a", "b", "c"}, @@ -516,6 +547,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "`a`=1 and `b`=2 and(`c`>=3 and `c`<8)", "`a`>1 or(`a`=1 and `b`>2)or(`a`=1 and `b`=2 and `c`>=8)", }, + false, }, // special case: avoid return same samples { @@ -530,6 +562,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "false", "`a`>1 or(`a`=1 and `b`>2)or(`a`=1 and `b`=2 and `c`>=3)", }, + false, }, // special case: numbers has bigger lexicographically order but lower number { @@ -544,6 +577,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "(`a`>12 and `a`<111)or(`a`=12 and(`b`>2 or(`b`=2 and `c`>=3)))or(`a`=111 and(`b`<4 or(`b`=4 and `c`<5)))", // should return sql correctly "`a`>111 or(`a`=111 and `b`>4)or(`a`=111 and `b`=4 and `c`>=5)", }, + false, }, // test string fields { @@ -558,6 +592,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "`a`=1 and((`b`>2 and `b`<4)or(`b`=2 and(`c`>='3'))or(`b`=4 and(`c`<'5')))", "`a`>1 or(`a`=1 and `b`>4)or(`a`=1 and `b`=4 and `c`>='5')", }, + false, }, { []string{"a", "b", "c", "d"}, @@ -571,6 +606,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "(`a`>1 and `a`<5)or(`a`=1 and(`b`>2 or(`b`=2 and `c`>3)or(`b`=2 and `c`=3 and `d`>=4)))or(`a`=5 and(`b`<6 or(`b`=6 and `c`<7)or(`b`=6 and `c`=7 and `d`<8)))", "`a`>5 or(`a`=5 and `b`>6)or(`a`=5 and `b`=6 and `c`>7)or(`a`=5 and `b`=6 and `c`=7 and `d`>=8)", }, + false, }, } transferHandleValStrings := func(handleColTypes []string, handleVals [][]driver.Value) [][]string { @@ -595,8 +631,8 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { return handleValStrings } - for i, testCase := range testCases { - c.Log(fmt.Sprintf("case #%d", i)) + for caseID, testCase := range testCases { + c.Log(fmt.Sprintf("case #%d", caseID)) handleColNames := testCase.handleColNames handleColTypes := testCase.handleColTypes handleVals := testCase.handleVals @@ -623,44 +659,512 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { }, } + if testCase.hasTiDBRowID { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + } else { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnError(&mysql.MyError{ + Code: mysql.ER_BAD_FIELD_ERROR, + State: "42S22", + Message: "Unknown column '_tidb_rowid' in 'field list'", + }) + rows := sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) + for i := range handleColNames { + rows.AddRow(handleColNames[i], handleColTypes[i]) + } + mock.ExpectQuery("SELECT c.COLUMN_NAME, DATA_TYPE FROM").WithArgs(database, table).WillReturnRows(rows) + } + + rows := sqlmock.NewRows(handleColNames) + for _, handleVal := range handleVals { + rows.AddRow(handleVal...) + } + mock.ExpectQuery(fmt.Sprintf("SELECT .* FROM `%s`.`%s` TABLESAMPLE REGIONS", database, table)).WillReturnRows(rows) + + rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"}) + for _, handleCol := range handleColNames { + rows.AddRow(handleCol, "") + } + mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table). + WillReturnRows(rows) + // special case, no value found, will scan whole table and try build order clause + if len(handleVals) == 0 { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + } + + c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + orderByClause := buildOrderByClauseString(handleColNames) + + checkQuery := func(i int, query string) { + task := <-taskChan + taskTableData, ok := task.(*TaskTableData) + c.Assert(ok, IsTrue) + c.Assert(taskTableData.ChunkIndex, Equals, i) + data, ok := taskTableData.Data.(*tableData) + c.Assert(ok, IsTrue) + c.Assert(data.query, Equals, query) + } + + // special case, no value found + if len(handleVals) == 0 { + orderByClause = "ORDER BY `_tidb_rowid`" + query := buildSelectQuery(database, table, "*", "", "", orderByClause) + checkQuery(0, query) + continue + } + + for i, w := range testCase.expectedWhereClauses { + query := buildSelectQuery(database, table, "*", "", buildWhereCondition(d.conf, w), orderByClause) + checkQuery(i, query) + } + } + } +} + +func (s *testSQLSuite) TestBuildPartitionClauses(c *C) { + const ( + dbName = "test" + tbName = "t" + fields = "*" + partition = "p0" + where = "WHERE a > 10" + orderByClause = "ORDER BY a" + ) + testCases := []struct { + partition string + where string + orderByClause string + expectedQuery string + }{ + { + "", + "", + "", + "SELECT * FROM `test`.`t`", + }, + { + partition, + "", + "", + "SELECT * FROM `test`.`t` PARTITION(`p0`)", + }, + { + partition, + where, + "", + "SELECT * FROM `test`.`t` PARTITION(`p0`) WHERE a > 10", + }, + { + partition, + "", + orderByClause, + "SELECT * FROM `test`.`t` PARTITION(`p0`) ORDER BY a", + }, + { + partition, + where, + orderByClause, + "SELECT * FROM `test`.`t` PARTITION(`p0`) WHERE a > 10 ORDER BY a", + }, + { + "", + where, + orderByClause, + "SELECT * FROM `test`.`t` WHERE a > 10 ORDER BY a", + }, + } + for _, testCase := range testCases { + query := buildSelectQuery(dbName, tbName, fields, testCase.partition, testCase.where, testCase.orderByClause) + c.Assert(query, Equals, testCase.expectedQuery) + } +} + +func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + conn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel() + + d := &Dumper{ + tctx: tctx, + conf: DefaultConfig(), + cancelCtx: cancel, + } + d.conf.ServerInfo = ServerInfo{ + ServerType: ServerTypeTiDB, + ServerVersion: gcSafePointVersion, + } + database := "foo" + table := "bar" + + testCases := []struct { + regionResults [][]driver.Value + handleColNames []string + handleColTypes []string + expectedWhereClauses []string + hasTiDBRowID bool + }{ + { + [][]driver.Value{ + {"7480000000000000FF3300000000000000F8", "7480000000000000FF3300000000000000F8"}, + }, + []string{"a"}, + []string{"bigint"}, + []string{ + "", + }, + false, + }, + { + [][]driver.Value{ + {"7480000000000000FF3300000000000000F8", "7480000000000000FF3300000000000000F8"}, + }, + []string{"_tidb_rowid"}, + []string{"bigint"}, + []string{ + "", + }, + true, + }, + { + [][]driver.Value{ + {"7480000000000000FF3300000000000000F8", "7480000000000000FF3300000000000000F8"}, + {"7480000000000000FF335F728000000000FF0EA6010000000000FA", "tableID=51, _tidb_rowid=960001"}, + {"7480000000000000FF335F728000000000FF1D4C010000000000FA", "tableID=51, _tidb_rowid=1920001"}, + {"7480000000000000FF335F728000000000FF2BF2010000000000FA", "tableID=51, _tidb_rowid=2880001"}, + }, + []string{"a"}, + []string{"bigint"}, + []string{ + "`a`<960001", + "`a`>=960001 and `a`<1920001", + "`a`>=1920001 and `a`<2880001", + "`a`>=2880001", + }, + false, + }, + { + [][]driver.Value{ + {"7480000000000000FF3300000000000000F8", "7480000000000000FF3300000000000000F8"}, + {"7480000000000000FF335F728000000000FF0EA6010000000000FA", "tableID=51, _tidb_rowid=960001"}, + // one invalid key + {"7520000000000000FF335F728000000000FF0EA6010000000000FA", "7520000000000000FF335F728000000000FF0EA6010000000000FA"}, + {"7480000000000000FF335F728000000000FF1D4C010000000000FA", "tableID=51, _tidb_rowid=1920001"}, + {"7480000000000000FF335F728000000000FF2BF2010000000000FA", "tableID=51, _tidb_rowid=2880001"}, + }, + []string{"_tidb_rowid"}, + []string{"bigint"}, + []string{ + "`_tidb_rowid`<960001", + "`_tidb_rowid`>=960001 and `_tidb_rowid`<1920001", + "`_tidb_rowid`>=1920001 and `_tidb_rowid`<2880001", + "`_tidb_rowid`>=2880001", + }, + true, + }, + } + + for caseID, testCase := range testCases { + c.Log(fmt.Sprintf("case #%d", caseID)) + handleColNames := testCase.handleColNames + handleColTypes := testCase.handleColTypes + regionResults := testCase.regionResults + + // Test build tasks through table region + taskChan := make(chan Task, 128) + quotaCols := make([]string, 0, len(handleColNames)) + for _, col := range quotaCols { + quotaCols = append(quotaCols, wrapBackTicks(col)) + } + selectFields := strings.Join(quotaCols, ",") + meta := &tableMeta{ + database: database, + table: table, + selectedField: selectFields, + specCmts: []string{ + "/*!40101 SET NAMES binary*/;", + }, + } + + mock.ExpectQuery("SELECT PARTITION_NAME from INFORMATION_SCHEMA.PARTITIONS"). + WithArgs(database, table).WillReturnRows(sqlmock.NewRows([]string{"PARTITION_NAME"}).AddRow(nil)) + + if testCase.hasTiDBRowID { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + } else { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnError(&mysql.MyError{ + Code: mysql.ER_BAD_FIELD_ERROR, + State: "42S22", + Message: "Unknown column '_tidb_rowid' in 'field list'", + }) rows := sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) for i := range handleColNames { rows.AddRow(handleColNames[i], handleColTypes[i]) } mock.ExpectQuery("SELECT c.COLUMN_NAME, DATA_TYPE FROM").WithArgs(database, table).WillReturnRows(rows) + } + + rows := sqlmock.NewRows([]string{"START_KEY", "tidb_decode_key(START_KEY)"}) + for _, regionResult := range regionResults { + rows.AddRow(regionResult...) + } + mock.ExpectQuery("SELECT START_KEY,tidb_decode_key\\(START_KEY\\) from INFORMATION_SCHEMA.TIKV_REGION_STATUS"). + WithArgs(database, table).WillReturnRows(rows) + + rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"}) + for _, handleCol := range handleColNames { + rows.AddRow(handleCol, "") + } + mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table). + WillReturnRows(rows) + + orderByClause := buildOrderByClauseString(handleColNames) + // special case, no enough value to split chunks + if len(regionResults) <= 1 { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + orderByClause = "ORDER BY `_tidb_rowid`" + } + c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + + for i, w := range testCase.expectedWhereClauses { + query := buildSelectQuery(database, table, "*", "", buildWhereCondition(d.conf, w), orderByClause) + task := <-taskChan + taskTableData, ok := task.(*TaskTableData) + c.Assert(ok, IsTrue) + c.Assert(taskTableData.ChunkIndex, Equals, i) + data, ok := taskTableData.Data.(*tableData) + c.Assert(ok, IsTrue) + c.Assert(data.query, Equals, query) + } + } +} + +func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + conn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel() + + d := &Dumper{ + tctx: tctx, + conf: DefaultConfig(), + cancelCtx: cancel, + } + d.conf.ServerInfo = ServerInfo{ + ServerType: ServerTypeTiDB, + ServerVersion: gcSafePointVersion, + } + database := "foo" + table := "bar" + partitions := []string{"p0", "p1", "p2"} + + testCases := []struct { + regionResults [][][]driver.Value + handleColNames []string + handleColTypes []string + expectedWhereClauses [][]string + hasTiDBRowID bool + dumpWholeTable bool + }{ + { + [][][]driver.Value{ + { + {6009, "t_121_i_1_0380000000000ea6010380000000000ea601", "t_121_", 6010, 1, 6010, 0, 0, 0, 74, 1052002}, + {6011, "t_121_", "t_121_i_1_0380000000000ea6010380000000000ea601", 6012, 1, 6012, 0, 0, 0, 68, 972177}, + }, + { + {6015, "t_122_i_1_0380000000002d2a810380000000002d2a81", "t_122_", 6016, 1, 6016, 0, 0, 0, 77, 1092962}, + {6017, "t_122_", "t_122_i_1_0380000000002d2a810380000000002d2a81", 6018, 1, 6018, 0, 0, 0, 66, 939975}, + }, + { + {6021, "t_123_i_1_0380000000004baf010380000000004baf01", "t_123_", 6022, 1, 6022, 0, 0, 0, 85, 1206726}, + {6023, "t_123_", "t_123_i_1_0380000000004baf010380000000004baf01", 6024, 1, 6024, 0, 0, 0, 65, 927576}, + }, + }, + []string{"_tidb_rowid"}, + []string{"bigint"}, + [][]string{ + {""}, {""}, {""}, + }, + true, + true, + }, + { + [][][]driver.Value{ + { + {6009, "t_121_i_1_0380000000000ea6010380000000000ea601", "t_121_r_10001", 6010, 1, 6010, 0, 0, 0, 74, 1052002}, + {6013, "t_121_r_10001", "t_121_r_970001", 6014, 1, 6014, 0, 0, 0, 75, 975908}, + {6003, "t_121_r_970001", "t_122_", 6004, 1, 6004, 0, 0, 0, 79, 1022285}, + {6011, "t_121_", "t_121_i_1_0380000000000ea6010380000000000ea601", 6012, 1, 6012, 0, 0, 0, 68, 972177}, + }, + { + {6015, "t_122_i_1_0380000000002d2a810380000000002d2a81", "t_122_r_2070760", 6016, 1, 6016, 0, 0, 0, 77, 1092962}, + {6019, "t_122_r_2070760", "t_122_r_3047115", 6020, 1, 6020, 0, 0, 0, 75, 959650}, + {6005, "t_122_r_3047115", "t_123_", 6006, 1, 6006, 0, 0, 0, 77, 992339}, + {6017, "t_122_", "t_122_i_1_0380000000002d2a810380000000002d2a81", 6018, 1, 6018, 0, 0, 0, 66, 939975}, + }, + { + {6021, "t_123_i_1_0380000000004baf010380000000004baf01", "t_123_r_4186953", 6022, 1, 6022, 0, 0, 0, 85, 1206726}, + {6025, "t_123_r_4186953", "t_123_r_5165682", 6026, 1, 6026, 0, 0, 0, 74, 951379}, + {6007, "t_123_r_5165682", "t_124_", 6008, 1, 6008, 0, 0, 0, 71, 918488}, + {6023, "t_123_", "t_123_i_1_0380000000004baf010380000000004baf01", 6024, 1, 6024, 0, 0, 0, 65, 927576}, + }, + }, + []string{"_tidb_rowid"}, + []string{"bigint"}, + [][]string{ + { + "`_tidb_rowid`<10001", + "`_tidb_rowid`>=10001 and `_tidb_rowid`<970001", + "`_tidb_rowid`>=970001", + }, + { + "`_tidb_rowid`<2070760", + "`_tidb_rowid`>=2070760 and `_tidb_rowid`<3047115", + "`_tidb_rowid`>=3047115", + }, + { + "`_tidb_rowid`<4186953", + "`_tidb_rowid`>=4186953 and `_tidb_rowid`<5165682", + "`_tidb_rowid`>=5165682", + }, + }, + true, + false, + }, + { + [][][]driver.Value{ + { + {6041, "t_134_", "t_134_r_960001", 6042, 1, 6042, 0, 0, 0, 69, 964987}, + {6035, "t_134_r_960001", "t_135_", 6036, 1, 6036, 0, 0, 0, 75, 1052130}, + }, + { + {6043, "t_135_", "t_135_r_2960001", 6044, 1, 6044, 0, 0, 0, 69, 969576}, + {6037, "t_135_r_2960001", "t_136_", 6038, 1, 6038, 0, 0, 0, 72, 1014464}, + }, + { + {6045, "t_136_", "t_136_r_4960001", 6046, 1, 6046, 0, 0, 0, 68, 957557}, + {6039, "t_136_r_4960001", "t_137_", 6040, 1, 6040, 0, 0, 0, 75, 1051579}, + }, + }, + []string{"a"}, + []string{"bigint"}, + [][]string{ + + { + "`a`<960001", + "`a`>=960001", + }, + { + "`a`<2960001", + "`a`>=2960001", + }, + { + "`a`<4960001", + "`a`>=4960001", + }, + }, + false, + false, + }, + } + + for i, testCase := range testCases { + c.Log(fmt.Sprintf("case #%d", i)) + handleColNames := testCase.handleColNames + handleColTypes := testCase.handleColTypes + regionResults := testCase.regionResults + + // Test build tasks through table region + taskChan := make(chan Task, 128) + quotaCols := make([]string, 0, len(handleColNames)) + for _, col := range quotaCols { + quotaCols = append(quotaCols, wrapBackTicks(col)) + } + selectFields := strings.Join(quotaCols, ",") + meta := &tableMeta{ + database: database, + table: table, + selectedField: selectFields, + specCmts: []string{ + "/*!40101 SET NAMES binary*/;", + }, + } + + rows := sqlmock.NewRows([]string{"PARTITION_NAME"}) + for _, partition := range partitions { + rows.AddRow(partition) + } + mock.ExpectQuery("SELECT PARTITION_NAME from INFORMATION_SCHEMA.PARTITIONS"). + WithArgs(database, table).WillReturnRows(rows) + + if testCase.hasTiDBRowID { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + } else { mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). WillReturnError(&mysql.MyError{ Code: mysql.ER_BAD_FIELD_ERROR, State: "42S22", Message: "Unknown column '_tidb_rowid' in 'field list'", }) + rows := sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) + for i := range handleColNames { + rows.AddRow(handleColNames[i], handleColTypes[i]) + } + mock.ExpectQuery("SELECT c.COLUMN_NAME, DATA_TYPE FROM").WithArgs(database, table).WillReturnRows(rows) + } - rows = sqlmock.NewRows(handleColNames) - for _, handleVal := range handleVals { - rows.AddRow(handleVal...) + for i, partition := range partitions { + rows := sqlmock.NewRows([]string{"REGION_ID", "START_KEY", "END_KEY", "LEADER_ID", "LEADER_STORE_ID", "PEERS", "SCATTERING", "WRITTEN_BYTES", "READ_BYTES", "APPROXIMATE_SIZE(MB)", "APPROXIMATE_KEYS"}) + for _, regionResult := range regionResults[i] { + rows.AddRow(regionResult...) } - mock.ExpectQuery(fmt.Sprintf("SELECT .* FROM `%s`.`%s` TABLESAMPLE REGIONS", database, table)).WillReturnRows(rows) + mock.ExpectQuery(fmt.Sprintf("SHOW TABLE `%s`.`%s` PARTITION\\(`%s`\\) REGIONS", database, table, partition)). + WillReturnRows(rows) + } + for range partitions { rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"}) for _, handleCol := range handleColNames { rows.AddRow(handleCol, "") } mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table). WillReturnRows(rows) + // special case, dump whole table + if testCase.dumpWholeTable { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + } + } - c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) - c.Assert(mock.ExpectationsWereMet(), IsNil) - orderByClause := buildOrderByClauseString(handleColNames) + orderByClause := buildOrderByClauseString(handleColNames) + c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) - for i, w := range testCase.expectedWhereClauses { - query := buildSelectQuery(database, table, "*", "", buildWhereCondition(d.conf, w), orderByClause) + chunkIdx := 0 + for i, partition := range partitions { + for _, w := range testCase.expectedWhereClauses[i] { + query := buildSelectQuery(database, table, "*", partition, buildWhereCondition(d.conf, w), orderByClause) task := <-taskChan taskTableData, ok := task.(*TaskTableData) c.Assert(ok, IsTrue) - c.Assert(taskTableData.ChunkIndex, Equals, i) + c.Assert(taskTableData.ChunkIndex, Equals, chunkIdx) data, ok := taskTableData.Data.(*tableData) c.Assert(ok, IsTrue) c.Assert(data.query, Equals, query) + chunkIdx++ } } } From d83528d0d4facf3c40e396d21bebf3ae519bffc9 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 23 Apr 2021 19:24:04 +0800 Subject: [PATCH 04/11] fix make check --- v4/export/dump.go | 34 ++++++++++++++++++---------------- v4/export/sql.go | 4 +++- v4/export/sql_test.go | 25 +++++++++++++++---------- v4/export/sql_type.go | 6 +++--- 4 files changed, 39 insertions(+), 30 deletions(-) diff --git a/v4/export/dump.go b/v4/export/dump.go index eda74d77..190736cf 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -442,7 +442,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met if conf.ServerInfo.ServerType == ServerTypeTiDB && conf.ServerInfo.ServerVersion != nil && conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) >= 0 { - return d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan, conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0) + return d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan) } field, err := pickupPossibleField(db, tbl, conn, conf) if err != nil { @@ -569,7 +569,7 @@ func (d *Dumper) selectMinAndMaxIntValue(conn *sql.Conn, db, tbl, field string) return min, max, nil } -func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, useTableSample bool) error { +func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task) error { db, tbl := meta.DatabaseName(), meta.TableName() var ( @@ -577,14 +577,15 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn handleVals [][]string err error ) - if useTableSample { + if d.conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 { tctx.L().Debug("dumping TiDB tables with TABLESAMPLE", zap.String("database", db), zap.String("table", tbl)) handleColNames, handleVals, err = selectTiDBTableSample(tctx, conn, db, tbl) } else { tctx.L().Debug("dumping TiDB tables with TABLE REGIONS", zap.String("database", db), zap.String("table", tbl)) - partitions, err := GetPartitionNames(conn, db, tbl) + var partitions []string + partitions, err = GetPartitionNames(conn, db, tbl) if err == nil { if len(partitions) == 0 { handleColNames, handleVals, err = selectTiDBTableRegion(tctx, conn, db, tbl) @@ -747,7 +748,7 @@ func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, table rowID = -1 ) const ( - tableRegionSql = "SELECT START_KEY,tidb_decode_key(START_KEY) from INFORMATION_SCHEMA.TIKV_REGION_STATUS s WHERE s.DB_NAME = ? AND s.TABLE_NAME = ? AND IS_INDEX = 0 ORDER BY START_KEY;" + tableRegionSQL = "SELECT START_KEY,tidb_decode_key(START_KEY) from INFORMATION_SCHEMA.TIKV_REGION_STATUS s WHERE s.DB_NAME = ? AND s.TABLE_NAME = ? AND IS_INDEX = 0 ORDER BY START_KEY;" tidbRowID = "_tidb_rowid=" ) logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName)) @@ -769,16 +770,17 @@ func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, table logger.Debug("meet invalid decoded start key", zap.Int("rowID", rowID), zap.String("startKey", startKey.String)) return nil } - pkVal, err := extractTiDBRowIDFromDecodedKey(tidbRowID, decodedKey.String) - if err != nil { + pkVal, err2 := extractTiDBRowIDFromDecodedKey(tidbRowID, decodedKey.String) + if err2 != nil { logger.Debug("fail to extract pkVal from decoded start key", - zap.Int("rowID", rowID), zap.String("startKey", startKey.String), zap.String("decodedKey", decodedKey.String), zap.Error(err)) + zap.Int("rowID", rowID), zap.String("startKey", startKey.String), zap.String("decodedKey", decodedKey.String), zap.Error(err2)) } else { pkVals = append(pkVals, []string{pkVal}) } return nil - }, tableRegionSql, dbName, tableName) - return + }, tableRegionSQL, dbName, tableName) + + return pkFields, pkVals, errors.Trace(err) } func selectTiDBPartitionRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName, partition string) (pkVals [][]string, err error) { @@ -787,11 +789,11 @@ func selectTiDBPartitionRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, t startKeys []string ) const ( - partitionRegionSql = "SHOW TABLE `%s`.`%s` PARTITION(`%s`) REGIONS" + partitionRegionSQL = "SHOW TABLE `%s`.`%s` PARTITION(`%s`) REGIONS" regionRowKey = "r_" ) logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName), zap.String("partition", partition)) - rows, err = conn.QueryContext(tctx, fmt.Sprintf(partitionRegionSql, dbName, tableName, partition)) + rows, err = conn.QueryContext(tctx, fmt.Sprintf(partitionRegionSQL, dbName, tableName, partition)) if err != nil { err = errors.Trace(err) return @@ -804,16 +806,16 @@ func selectTiDBPartitionRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, t if rowID == 0 { continue } - pkVal, err := extractTiDBRowIDFromDecodedKey(regionRowKey, startKey) - if err != nil { + pkVal, err2 := extractTiDBRowIDFromDecodedKey(regionRowKey, startKey) + if err2 != nil { logger.Debug("show table region start key doesn't have rowID", - zap.Int("rowID", rowID), zap.String("startKey", startKey), zap.Error(err)) + zap.Int("rowID", rowID), zap.String("startKey", startKey), zap.Error(err2)) } else { pkVals = append(pkVals, []string{pkVal}) } } - return + return pkVals, err } func extractTiDBRowIDFromDecodedKey(indexField, key string) (string, error) { diff --git a/v4/export/sql.go b/v4/export/sql.go index 3c9be946..3df6b774 100644 --- a/v4/export/sql.go +++ b/v4/export/sql.go @@ -18,6 +18,8 @@ import ( "go.uber.org/zap" ) +const orderByTiDBRowID = "ORDER BY `_tidb_rowid`" + // ShowDatabases shows the databases of a database server. func ShowDatabases(db *sql.Conn) ([]string, error) { var res oneStrColumnTable @@ -248,7 +250,7 @@ func buildOrderByClause(conf *Config, db *sql.Conn, database, table string) (str return "", errors.Trace(err) } if ok { - return "ORDER BY `_tidb_rowid`", nil + return orderByTiDBRowID, nil } } cols, err := GetPrimaryKeyColumns(db, database, table) diff --git a/v4/export/sql_test.go b/v4/export/sql_test.go index a9188b5a..eb01a1bf 100644 --- a/v4/export/sql_test.go +++ b/v4/export/sql_test.go @@ -198,7 +198,7 @@ func (s *testSQLSuite) TestBuildOrderByClause(c *C) { orderByClause, err := buildOrderByClause(mockConf, conn, "test", "t") c.Assert(err, IsNil) - c.Assert(orderByClause, Equals, "ORDER BY `_tidb_rowid`") + c.Assert(orderByClause, Equals, orderByTiDBRowID) // _tidb_rowid is unavailable, or PKIsHandle. mock.ExpectExec("SELECT _tidb_rowid from `test`.`t`"). @@ -358,11 +358,13 @@ func (s *testSQLSuite) TestGetSuitableRows(c *C) { defer db.Close() conn, err := db.Conn(context.Background()) c.Assert(err, IsNil) - const query = "select AVG_ROW_LENGTH from INFORMATION_SCHEMA.TABLES where table_schema=\\? and table_name=\\?;" tctx, cancel := tcontext.Background().WithCancel() defer cancel() - database := "foo" - table := "bar" + const ( + query = "select AVG_ROW_LENGTH from INFORMATION_SCHEMA.TABLES where table_schema=\\? and table_name=\\?;" + database = "foo" + table = "bar" + ) testCases := []struct { avgRowLength uint64 @@ -426,8 +428,11 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { ServerType: ServerTypeTiDB, ServerVersion: tableSampleVersion, } - database := "foo" - table := "bar" + + const ( + database = "foo" + table = "bar" + ) testCases := []struct { handleColNames []string @@ -710,7 +715,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { // special case, no value found if len(handleVals) == 0 { - orderByClause = "ORDER BY `_tidb_rowid`" + orderByClause = orderByTiDBRowID query := buildSelectQuery(database, table, "*", "", "", orderByClause) checkQuery(0, query) continue @@ -930,7 +935,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { if len(regionResults) <= 1 { mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). WillReturnResult(sqlmock.NewResult(0, 0)) - orderByClause = "ORDER BY `_tidb_rowid`" + orderByClause = orderByTiDBRowID } c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) @@ -1119,7 +1124,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { State: "42S22", Message: "Unknown column '_tidb_rowid' in 'field list'", }) - rows := sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) + rows = sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) for i := range handleColNames { rows.AddRow(handleColNames[i], handleColTypes[i]) } @@ -1127,7 +1132,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { } for i, partition := range partitions { - rows := sqlmock.NewRows([]string{"REGION_ID", "START_KEY", "END_KEY", "LEADER_ID", "LEADER_STORE_ID", "PEERS", "SCATTERING", "WRITTEN_BYTES", "READ_BYTES", "APPROXIMATE_SIZE(MB)", "APPROXIMATE_KEYS"}) + rows = sqlmock.NewRows([]string{"REGION_ID", "START_KEY", "END_KEY", "LEADER_ID", "LEADER_STORE_ID", "PEERS", "SCATTERING", "WRITTEN_BYTES", "READ_BYTES", "APPROXIMATE_SIZE(MB)", "APPROXIMATE_KEYS"}) for _, regionResult := range regionResults[i] { rows.AddRow(regionResult...) } diff --git a/v4/export/sql_type.go b/v4/export/sql_type.go index db874fa2..813dea32 100644 --- a/v4/export/sql_type.go +++ b/v4/export/sql_type.go @@ -18,14 +18,14 @@ var ( ) func initColTypeRowReceiverMap() { - var dataTypeStringArr = []string{ + dataTypeStringArr := []string{ "CHAR", "NCHAR", "VARCHAR", "NVARCHAR", "CHARACTER", "VARCHARACTER", "TIMESTAMP", "DATETIME", "DATE", "TIME", "YEAR", "SQL_TSI_YEAR", "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", "ENUM", "SET", "JSON", } - var dataTypeNumArr = []string{ + dataTypeNumArr := []string{ "INTEGER", "BIGINT", "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "INT1", "INT2", "INT3", "INT8", "FLOAT", "REAL", "DOUBLE", "DOUBLE PRECISION", @@ -33,7 +33,7 @@ func initColTypeRowReceiverMap() { "BOOL", "BOOLEAN", } - var dataTypeBinArr = []string{ + dataTypeBinArr := []string{ "BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "LONG", "BINARY", "VARBINARY", "BIT", From b1f877096fd12ef56db81654b0361bfd9abc74fb Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 25 Apr 2021 10:57:25 +0800 Subject: [PATCH 05/11] do dump TiDB only when HasTiKV --- v4/export/config.go | 1 + v4/export/dump.go | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/v4/export/config.go b/v4/export/config.go index 00d86803..18ed43d4 100644 --- a/v4/export/config.go +++ b/v4/export/config.go @@ -567,6 +567,7 @@ var ( type ServerInfo struct { ServerType ServerType ServerVersion *semver.Version + HasTiKV bool } // ServerInfoUnknown is the unknown database type to dumpling diff --git a/v4/export/dump.go b/v4/export/dump.go index 190736cf..270a4b98 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -440,7 +440,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met conf := d.conf db, tbl := meta.DatabaseName(), meta.TableName() if conf.ServerInfo.ServerType == ServerTypeTiDB && - conf.ServerInfo.ServerVersion != nil && + conf.ServerInfo.ServerVersion != nil && conf.ServerInfo.HasTiKV && conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) >= 0 { return d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan) } @@ -1136,21 +1136,21 @@ func setSessionParam(d *Dumper) error { if si.ServerType == ServerTypeTiDB && conf.TiDBMemQuotaQuery != UnspecifiedSize { sessionParam[TiDBMemQuotaQueryName] = conf.TiDBMemQuotaQuery } + var err error if snapshot != "" { if si.ServerType != ServerTypeTiDB { return errors.New("snapshot consistency is not supported for this server") } if consistency == consistencyTypeSnapshot { - hasTiKV, err := CheckTiDBWithTiKV(pool) + conf.ServerInfo.HasTiKV, err = CheckTiDBWithTiKV(pool) if err != nil { return err } - if hasTiKV { + if conf.ServerInfo.HasTiKV { sessionParam["tidb_snapshot"] = snapshot } } } - var err error if d.dbHandle, err = resetDBWithSessionParams(d.tctx, pool, conf.GetDSN(""), conf.SessionParams); err != nil { return errors.Trace(err) } From 8d4454542acdcf5e814cf9bdce31f027aae6e603 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 25 Apr 2021 11:01:03 +0800 Subject: [PATCH 06/11] fix make check --- v4/export/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v4/export/config.go b/v4/export/config.go index 18ed43d4..a5b99f7e 100644 --- a/v4/export/config.go +++ b/v4/export/config.go @@ -565,9 +565,9 @@ var ( // ServerInfo is the combination of ServerType and ServerInfo type ServerInfo struct { + HasTiKV bool ServerType ServerType ServerVersion *semver.Version - HasTiKV bool } // ServerInfoUnknown is the unknown database type to dumpling From cc2c42e697807779c62bf897472dd9b20beb7552 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 25 Apr 2021 11:10:42 +0800 Subject: [PATCH 07/11] fix hasTiKV --- v4/export/sql_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/v4/export/sql_test.go b/v4/export/sql_test.go index eb01a1bf..cd44d82e 100644 --- a/v4/export/sql_test.go +++ b/v4/export/sql_test.go @@ -425,6 +425,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { cancelCtx: cancel, } d.conf.ServerInfo = ServerInfo{ + HasTiKV: true, ServerType: ServerTypeTiDB, ServerVersion: tableSampleVersion, } @@ -801,6 +802,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { cancelCtx: cancel, } d.conf.ServerInfo = ServerInfo{ + HasTiKV: true, ServerType: ServerTypeTiDB, ServerVersion: gcSafePointVersion, } @@ -967,6 +969,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { cancelCtx: cancel, } d.conf.ServerInfo = ServerInfo{ + HasTiKV: true, ServerType: ServerTypeTiDB, ServerVersion: gcSafePointVersion, } From 9b203b79c8b3b6a1eef0c0b9f22789b4b94e909e Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 25 Apr 2021 11:50:48 +0800 Subject: [PATCH 08/11] fix condition --- v4/export/dump.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/v4/export/dump.go b/v4/export/dump.go index 270a4b98..def89249 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -440,8 +440,9 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met conf := d.conf db, tbl := meta.DatabaseName(), meta.TableName() if conf.ServerInfo.ServerType == ServerTypeTiDB && - conf.ServerInfo.ServerVersion != nil && conf.ServerInfo.HasTiKV && - conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) >= 0 { + conf.ServerInfo.ServerVersion != nil && + (conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 || + (conf.ServerInfo.HasTiKV && conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) >= 0)) { return d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan) } field, err := pickupPossibleField(db, tbl, conn, conf) From bfaf7dc29249eeb6b12a071c235f9934387f6a58 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 25 Apr 2021 15:33:46 +0800 Subject: [PATCH 09/11] address comments --- v4/export/dump.go | 2 +- v4/export/sql.go | 11 +++++++---- v4/export/sql_type.go | 1 + 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/v4/export/dump.go b/v4/export/dump.go index def89249..61487eb2 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -799,7 +799,7 @@ func selectTiDBPartitionRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, t err = errors.Trace(err) return } - startKeys, err = GetSpecifiedColumnValue(rows, "START_KEY") + startKeys, err = GetSpecifiedColumnValueAndClose(rows, "START_KEY") if err != nil { return } diff --git a/v4/export/sql.go b/v4/export/sql.go index 3df6b774..527e9cd4 100644 --- a/v4/export/sql.go +++ b/v4/export/sql.go @@ -440,8 +440,11 @@ func ShowMasterStatus(db *sql.Conn) ([]string, error) { return oneRow, nil } -// GetSpecifiedColumnValue get columns' values whose name is equal to columnName -func GetSpecifiedColumnValue(rows *sql.Rows, columnName string) ([]string, error) { +// GetSpecifiedColumnValueAndClose get columns' values whose name is equal to columnName and close the given rows +func GetSpecifiedColumnValueAndClose(rows *sql.Rows, columnName string) ([]string, error) { + if rows == nil { + return []string{}, nil + } defer rows.Close() var strs []string columns, _ := rows.Columns() @@ -479,7 +482,7 @@ func GetPdAddrs(tctx *tcontext.Context, db *sql.DB) ([]string, error) { zap.String("query", query), zap.Error(err)) return []string{}, errors.Annotatef(err, "sql: %s", query) } - return GetSpecifiedColumnValue(rows, "STATUS_ADDRESS") + return GetSpecifiedColumnValueAndClose(rows, "STATUS_ADDRESS") } // GetTiDBDDLIDs gets DDL IDs from TiDB @@ -491,7 +494,7 @@ func GetTiDBDDLIDs(tctx *tcontext.Context, db *sql.DB) ([]string, error) { zap.String("query", query), zap.Error(err)) return []string{}, errors.Annotatef(err, "sql: %s", query) } - return GetSpecifiedColumnValue(rows, "DDL_ID") + return GetSpecifiedColumnValueAndClose(rows, "DDL_ID") } // CheckTiDBWithTiKV use sql to check whether current TiDB has TiKV diff --git a/v4/export/sql_type.go b/v4/export/sql_type.go index 813dea32..9f702ed9 100644 --- a/v4/export/sql_type.go +++ b/v4/export/sql_type.go @@ -17,6 +17,7 @@ var ( doubleQuotationMark = []byte{'"'} ) +// data type reference: https://dev.mysql.com/doc/refman/8.0/en/data-types.html func initColTypeRowReceiverMap() { dataTypeStringArr := []string{ "CHAR", "NCHAR", "VARCHAR", "NVARCHAR", "CHARACTER", "VARCHARACTER", From 69c1e7c6012590500e0deceed11c876b49c90f19 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 29 Apr 2021 18:15:13 +0800 Subject: [PATCH 10/11] address comments --- v4/export/dump.go | 2 +- v4/export/sql.go | 1 + v4/export/sql_test.go | 2 +- v4/export/sql_type.go | 10 ++++++++-- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/v4/export/dump.go b/v4/export/dump.go index 61487eb2..698bb8d6 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -794,7 +794,7 @@ func selectTiDBPartitionRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, t regionRowKey = "r_" ) logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName), zap.String("partition", partition)) - rows, err = conn.QueryContext(tctx, fmt.Sprintf(partitionRegionSQL, dbName, tableName, partition)) + rows, err = conn.QueryContext(tctx, fmt.Sprintf(partitionRegionSQL, escapeString(dbName), escapeString(tableName), escapeString(partition))) if err != nil { err = errors.Trace(err) return diff --git a/v4/export/sql.go b/v4/export/sql.go index 527e9cd4..eb0bb1b2 100644 --- a/v4/export/sql.go +++ b/v4/export/sql.go @@ -446,6 +446,7 @@ func GetSpecifiedColumnValueAndClose(rows *sql.Rows, columnName string) ([]strin return []string{}, nil } defer rows.Close() + columnName = strings.ToUpper(columnName) var strs []string columns, _ := rows.Columns() addr := make([]interface{}, len(columns)) diff --git a/v4/export/sql_test.go b/v4/export/sql_test.go index cd44d82e..7d8c94ad 100644 --- a/v4/export/sql_test.go +++ b/v4/export/sql_test.go @@ -1139,7 +1139,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { for _, regionResult := range regionResults[i] { rows.AddRow(regionResult...) } - mock.ExpectQuery(fmt.Sprintf("SHOW TABLE `%s`.`%s` PARTITION\\(`%s`\\) REGIONS", database, table, partition)). + mock.ExpectQuery(fmt.Sprintf("SHOW TABLE `%s`.`%s` PARTITION\\(`%s`\\) REGIONS", escapeString(database), escapeString(table), escapeString(partition))). WillReturnRows(rows) } diff --git a/v4/export/sql_type.go b/v4/export/sql_type.go index 9f702ed9..dabb557b 100644 --- a/v4/export/sql_type.go +++ b/v4/export/sql_type.go @@ -17,13 +17,19 @@ var ( doubleQuotationMark = []byte{'"'} ) -// data type reference: https://dev.mysql.com/doc/refman/8.0/en/data-types.html +// There are two kinds of scenes to use this dataType +// The first is to be the receiver of table sample, which will use tidb's INFORMATION_SCHEMA.COLUMNS's DATA_TYPE column, which is from +// https://github.com/pingcap/tidb/blob/619c4720059ea619081b01644ef3084b426d282f/executor/infoschema_reader.go#L654 +// https://github.com/pingcap/parser/blob/8e8ed7927bde11c4cf0967afc5e05ab5aeb14cc7/types/etc.go#L44-70 +// The second is to be the receiver of select row type, which will use sql.DB's rows.DatabaseTypeName(), which is from +// https://github.com/go-sql-driver/mysql/blob/v1.5.0/fields.go#L17-97 func initColTypeRowReceiverMap() { dataTypeStringArr := []string{ "CHAR", "NCHAR", "VARCHAR", "NVARCHAR", "CHARACTER", "VARCHARACTER", "TIMESTAMP", "DATETIME", "DATE", "TIME", "YEAR", "SQL_TSI_YEAR", "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", - "ENUM", "SET", "JSON", + "ENUM", "SET", "JSON", "NULL", "VAR_STRING", + "GEOMETRY", // TODO: support GEOMETRY later } dataTypeNumArr := []string{ From 55f9278aef4ce038efddb3fc631f399ee84c90fd Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 30 Apr 2021 08:58:15 +0800 Subject: [PATCH 11/11] address comments --- tests/primary_key/data/pk_case_3.sql | 24 ++++++++++++------------ tests/primary_key/result/pk_case_3.sql | 22 +++++++++++----------- v4/export/dump.go | 8 ++++---- v4/export/sql.go | 1 - v4/export/sql_type.go | 3 +-- 5 files changed, 28 insertions(+), 30 deletions(-) diff --git a/tests/primary_key/data/pk_case_3.sql b/tests/primary_key/data/pk_case_3.sql index 142102ce..ab69e5b9 100644 --- a/tests/primary_key/data/pk_case_3.sql +++ b/tests/primary_key/data/pk_case_3.sql @@ -1,14 +1,14 @@ # test random order and no primary key -create table `pk_case_3` (a int, b int); +create table `pk_case_3` (a int, b int, g geometry); insert into `pk_case_3` values -(6, 4), -(4, 6), -(8, 2), -(3, 7), -(1, 9), -(2, 8), -(5, 5), -(10, 0), -(0, 10), -(9, 1), -(7, 3); +(6, 4, ST_GeomFromText('POINT(1 1)')), +(4, 6, ST_GeomFromText('LINESTRING(2 1, 6 6)')), +(8, 2, NULL), +(3, 7, NULL), +(1, 9, NULL), +(2, 8, NULL), +(5, 5, NULL), +(10, 0, NULL), +(0, 10, NULL), +(9, 1, NULL), +(7, 3, NULL); diff --git a/tests/primary_key/result/pk_case_3.sql b/tests/primary_key/result/pk_case_3.sql index 0698d1a1..a29e568d 100644 --- a/tests/primary_key/result/pk_case_3.sql +++ b/tests/primary_key/result/pk_case_3.sql @@ -1,13 +1,13 @@ /*!40101 SET NAMES binary*/; INSERT INTO `pk_case_3` VALUES -(6,4), -(4,6), -(8,2), -(3,7), -(1,9), -(2,8), -(5,5), -(10,0), -(0,10), -(9,1), -(7,3); +(6,4,x'000000000101000000000000000000f03f000000000000f03f'), +(4,6,x'000000000102000000020000000000000000000040000000000000f03f00000000000018400000000000001840'), +(8,2,NULL), +(3,7,NULL), +(1,9,NULL), +(2,8,NULL), +(5,5,NULL), +(10,0,NULL), +(0,10,NULL), +(9,1,NULL), +(7,3,NULL); diff --git a/v4/export/dump.go b/v4/export/dump.go index 698bb8d6..1ad1ca61 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -610,7 +610,7 @@ func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn totalChunk := 0 cachedHandleVals := make([][][]string, len(partitions)) - handleColNames, _, err := selectTiDBPrimaryKeyFields(conn, db, tbl, checkTiDBTableRegionPkFields) + handleColNames, _, err := selectTiDBRowKeyFields(conn, db, tbl, checkTiDBTableRegionPkFields) if err != nil { return err } @@ -665,7 +665,7 @@ func (d *Dumper) L() log.Logger { } func selectTiDBTableSample(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { - pkFields, pkColTypes, err := selectTiDBPrimaryKeyFields(conn, dbName, tableName, nil) + pkFields, pkColTypes, err := selectTiDBRowKeyFields(conn, dbName, tableName, nil) if err != nil { return nil, nil, errors.Trace(err) } @@ -709,7 +709,7 @@ func buildTiDBTableSampleQuery(pkFields []string, dbName, tblName string) string return fmt.Sprintf(template, pks, escapeString(dbName), escapeString(tblName), pks) } -func selectTiDBPrimaryKeyFields(conn *sql.Conn, dbName, tableName string, checkPkFields func([]string, []string) error) (pkFields, pkColTypes []string, err error) { +func selectTiDBRowKeyFields(conn *sql.Conn, dbName, tableName string, checkPkFields func([]string, []string) error) (pkFields, pkColTypes []string, err error) { hasImplicitRowID, err := SelectTiDBRowID(conn, dbName, tableName) if err != nil { return @@ -739,7 +739,7 @@ func checkTiDBTableRegionPkFields(pkFields, pkColTypes []string) (err error) { } func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { - pkFields, _, err = selectTiDBPrimaryKeyFields(conn, dbName, tableName, checkTiDBTableRegionPkFields) + pkFields, _, err = selectTiDBRowKeyFields(conn, dbName, tableName, checkTiDBTableRegionPkFields) if err != nil { return } diff --git a/v4/export/sql.go b/v4/export/sql.go index eb0bb1b2..655d9c2b 100644 --- a/v4/export/sql.go +++ b/v4/export/sql.go @@ -470,7 +470,6 @@ func GetSpecifiedColumnValueAndClose(rows *sql.Rows, columnName string) ([]strin strs = append(strs, oneRow[fieldIndex].String) } } - rows.Close() return strs, errors.Trace(rows.Err()) } diff --git a/v4/export/sql_type.go b/v4/export/sql_type.go index dabb557b..69450961 100644 --- a/v4/export/sql_type.go +++ b/v4/export/sql_type.go @@ -29,7 +29,6 @@ func initColTypeRowReceiverMap() { "TIMESTAMP", "DATETIME", "DATE", "TIME", "YEAR", "SQL_TSI_YEAR", "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", "ENUM", "SET", "JSON", "NULL", "VAR_STRING", - "GEOMETRY", // TODO: support GEOMETRY later } dataTypeNumArr := []string{ @@ -43,7 +42,7 @@ func initColTypeRowReceiverMap() { dataTypeBinArr := []string{ "BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "LONG", "BINARY", "VARBINARY", - "BIT", + "BIT", "GEOMETRY", } for _, s := range dataTypeStringArr {