Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dumpling: check table-list types before dumping #53683

Merged
merged 5 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,9 +1145,24 @@ func getListTableTypeByConf(conf *Config) listTableType {
}

func prepareTableListToDump(tctx *tcontext.Context, conf *Config, db *sql.Conn) error {
if conf.SpecifiedTables || conf.SQL != "" {
if conf.SQL != "" {
return nil
}

ifSeqExists, err := CheckIfSeqExists(db)
if err != nil {
return err
}
var listType listTableType
if ifSeqExists {
listType = listTableByShowFullTables
} else {
listType = getListTableTypeByConf(conf)
}

if conf.SpecifiedTables {
return updateSpecifiedTablesMeta(tctx, db, conf.Tables, listType)
}
databases, err := prepareDumpingDatabases(tctx, conf, db)
if err != nil {
return err
Expand All @@ -1161,17 +1176,6 @@ func prepareTableListToDump(tctx *tcontext.Context, conf *Config, db *sql.Conn)
tableTypes = append(tableTypes, TableTypeSequence)
}

ifSeqExists, err := CheckIfSeqExists(db)
if err != nil {
return err
}
var listType listTableType
if ifSeqExists {
listType = listTableByShowFullTables
} else {
listType = getListTableTypeByConf(conf)
}

conf.Tables, err = ListAllDatabasesTables(tctx, db, databases, listType, tableTypes...)
if err != nil {
return err
Expand Down
113 changes: 113 additions & 0 deletions dumpling/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,119 @@ func RestoreCharset(w io.StringWriter) {
_, _ = w.WriteString("SET collation_connection = @PREV_COLLATION_CONNECTION;\n")
}

// updateSpecifiedTablesMeta updates DatabaseTables with correct table type and avg row size.
func updateSpecifiedTablesMeta(tctx *tcontext.Context, db *sql.Conn, dbTables DatabaseTables, listType listTableType) error {
var (
schema, table, tableTypeStr string
tableType TableType
avgRowLength uint64
err error
)
switch listType {
case listTableByInfoSchema:
dbNames := make([]string, 0, len(dbTables))
for db := range dbTables {
dbNames = append(dbNames, fmt.Sprintf("'%s'", db))
}
query := fmt.Sprintf("SELECT TABLE_SCHEMA,TABLE_NAME,TABLE_TYPE,AVG_ROW_LENGTH FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA IN (%s)", strings.Join(dbNames, ","))
if err := simpleQueryWithArgs(tctx, db, func(rows *sql.Rows) error {
var (
sqlAvgRowLength sql.NullInt64
err2 error
)
if err2 = rows.Scan(&schema, &table, &tableTypeStr, &sqlAvgRowLength); err != nil {
return errors.Trace(err2)
}

tbls, ok := dbTables[schema]
if !ok {
return nil
}
for _, tbl := range tbls {
if tbl.Name == table {
tableType, err2 = ParseTableType(tableTypeStr)
if err2 != nil {
return errors.Trace(err2)
}
if sqlAvgRowLength.Valid {
avgRowLength = uint64(sqlAvgRowLength.Int64)
} else {
avgRowLength = 0
}
tbl.Type = tableType
tbl.AvgRowLength = avgRowLength
}
}
return nil
}, query); err != nil {
return errors.Annotatef(err, "sql: %s", query)
}
return nil
case listTableByShowFullTables:
for schema, tbls := range dbTables {
query := fmt.Sprintf("SHOW FULL TABLES FROM `%s`",
escapeString(schema))
if err := simpleQueryWithArgs(tctx, db, func(rows *sql.Rows) error {
var err2 error
if err2 = rows.Scan(&table, &tableTypeStr); err != nil {
return errors.Trace(err2)
}
for _, tbl := range tbls {
if tbl.Name == table {
tableType, err2 = ParseTableType(tableTypeStr)
if err2 != nil {
return errors.Trace(err2)
}
tbl.Type = tableType
}
}
return nil
}, query); err != nil {
return errors.Annotatef(err, "sql: %s", query)
}
}
return nil
default:
const queryTemplate = "SHOW TABLE STATUS FROM `%s`"
for schema, tbls := range dbTables {
query := fmt.Sprintf(queryTemplate, escapeString(schema))
rows, err := db.QueryContext(tctx, query)
if err != nil {
return errors.Annotatef(err, "sql: %s", query)
}
results, err := GetSpecifiedColumnValuesAndClose(rows, "NAME", "ENGINE", "AVG_ROW_LENGTH", "COMMENT")
if err != nil {
return errors.Annotatef(err, "sql: %s", query)
}
for _, oneRow := range results {
table, engine, avgRowLengthStr, comment := oneRow[0], oneRow[1], oneRow[2], oneRow[3]
for _, tbl := range tbls {
if tbl.Name == table {
if avgRowLengthStr != "" {
avgRowLength, err = strconv.ParseUint(avgRowLengthStr, 10, 64)
if err != nil {
return errors.Annotatef(err, "sql: %s", query)
}
} else {
avgRowLength = 0
}
tbl.AvgRowLength = avgRowLength
tableType = TableTypeBase
if engine == "" && (comment == "" || comment == TableTypeViewStr) {
tableType = TableTypeView
} else if engine == "" {
tctx.L().Warn("invalid table without engine found", zap.String("database", schema), zap.String("table", table))
continue
}
tbl.Type = tableType
}
}
}
}
return nil
}
}

// ListAllDatabasesTables lists all the databases and tables from the database
// listTableByInfoSchema list tables by table information_schema in MySQL
// listTableByShowTableStatus has better performance than listTableByInfoSchema
Expand Down
31 changes: 31 additions & 0 deletions dumpling/tests/specified_table_view/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/sh
#
# Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.

set -eu
cur=$(cd `dirname $0`; pwd)

DB_NAME="specified_table_view"
TABLE_NAME="t"
VIEW_NAME="v"

run_sql "drop database if exists \`$DB_NAME\`;"
run_sql "create database \`$DB_NAME\`;"
run_sql "create table \`$DB_NAME\`.\`$TABLE_NAME\` (a int);"
run_sql "create view \`$DB_NAME\`.\`$VIEW_NAME\` as select * from \`$DB_NAME\`.\`$TABLE_NAME\`;"

set +e
rm -rf "$DUMPLING_OUTPUT_DIR"
run_dumpling --consistency=lock -T="$DB_NAME.$TABLE_NAME,$DB_NAME.$VIEW_NAME" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
set -e

file_should_exist "$DUMPLING_OUTPUT_DIR/$DB_NAME.$TABLE_NAME-schema.sql"
file_should_exist "$DUMPLING_OUTPUT_DIR/$DB_NAME.$VIEW_NAME-schema-view.sql"

set +e
rm -rf "$DUMPLING_OUTPUT_DIR"
run_dumpling --consistency=lock -T="$DB_NAME.$TABLE_NAME,$DB_NAME.$VIEW_NAME" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
set -e

file_should_exist "$DUMPLING_OUTPUT_DIR/$DB_NAME.$TABLE_NAME-schema.sql"
file_should_exist "$DUMPLING_OUTPUT_DIR/$DB_NAME.$VIEW_NAME-schema-view.sql"
Loading