diff --git a/br/pkg/lightning/mydump/csv_parser.go b/br/pkg/lightning/mydump/csv_parser.go index f3b9d45a5d989..854f63830b468 100644 --- a/br/pkg/lightning/mydump/csv_parser.go +++ b/br/pkg/lightning/mydump/csv_parser.go @@ -137,7 +137,7 @@ func NewCSVParser( if len(cfg.StartingBy) > 0 { if strings.Contains(cfg.StartingBy, terminator) { - return nil, errors.New("starting-by cannot contain (line) terminator") + return nil, errors.Errorf("STARTING BY '%s' cannot contain LINES TERMINATED BY '%s'", cfg.StartingBy, terminator) } } diff --git a/br/pkg/lightning/mydump/csv_parser_test.go b/br/pkg/lightning/mydump/csv_parser_test.go index 795b11f1c1c20..3e002eeef42ab 100644 --- a/br/pkg/lightning/mydump/csv_parser_test.go +++ b/br/pkg/lightning/mydump/csv_parser_test.go @@ -1346,7 +1346,7 @@ yyy",5,xx"xxxx,8 }, } _, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, nil, 1, ioWorkers, false, nil) - require.ErrorContains(t, err, "starting-by cannot contain (line) terminator") + require.ErrorContains(t, err, "STARTING BY 'x\nxx' cannot contain LINES TERMINATED BY '\n'") } func TestCharsetConversion(t *testing.T) { diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 2998098678494..7abbd9d0e814b 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -157,10 +157,10 @@ func (options *S3BackendOptions) Apply(s3 *backuppb.S3) error { return errors.Trace(err) } if u.Scheme == "" { - return errors.Annotate(berrors.ErrStorageInvalidConfig, "scheme not found in endpoint") + return errors.Errorf("scheme not found in endpoint") } if u.Host == "" { - return errors.Annotate(berrors.ErrStorageInvalidConfig, "host not found in endpoint") + return errors.Errorf("host not found in endpoint") } } // In some cases, we need to set ForcePathStyle to false. diff --git a/errno/errcode.go b/errno/errcode.go index 0501bc0020e07..a165e2f6ded2e 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1045,6 +1045,15 @@ const ( ErrTempTableNotAllowedWithTTL = 8151 ErrUnsupportedTTLReferencedByFK = 8152 ErrUnsupportedPrimaryKeyTypeWithTTL = 8153 + ErrLoadDataFromServerDisk = 8154 + ErrLoadParquetFromLocal = 8155 + ErrLoadDataEmptyPath = 8156 + ErrLoadDataUnsupportedFormat = 8157 + ErrLoadDataInvalidURI = 8158 + ErrLoadDataCantAccess = 8159 + ErrLoadDataCantRead = 8160 + ErrLoadDataPhysicalImportTableNotEmpty = 8161 + ErrLoadDataWrongFormatConfig = 8162 // Error codes used by TiDB ddl package ErrUnsupportedDDLOperation = 8200 diff --git a/errno/errname.go b/errno/errname.go index 1d5a5242ca227..b7272a4710e27 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1040,6 +1040,14 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrTempTableNotAllowedWithTTL: mysql.Message("Set TTL for temporary table is not allowed", nil), ErrUnsupportedTTLReferencedByFK: mysql.Message("Set TTL for a table referenced by foreign key is not allowed", nil), ErrUnsupportedPrimaryKeyTypeWithTTL: mysql.Message("Unsupported clustered primary key type FLOAT/DOUBLE for TTL", nil), + ErrLoadDataFromServerDisk: mysql.Message("Don't support load data from tidb-server's disk. Or if you want to load local data via client, the path of INFILE '%s' needs to specify the clause of LOCAL first", nil), + ErrLoadParquetFromLocal: mysql.Message("Do not support loading parquet files from local. Please try to load the parquet files from the cloud storage", nil), + ErrLoadDataEmptyPath: mysql.Message("The value of INFILE must not be empty when LOAD DATA from LOCAL", nil), + ErrLoadDataUnsupportedFormat: mysql.Message("The FORMAT '%s' is not supported", nil), + ErrLoadDataInvalidURI: mysql.Message("The URI of INFILE is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'", nil), + ErrLoadDataCantAccess: mysql.Message("Access to the source file has been denied. Please check the URI, access key and secret access key are correct", nil), + ErrLoadDataCantRead: mysql.Message("Failed to read source files. Reason: %s. %s", nil), + ErrLoadDataWrongFormatConfig: mysql.Message("", nil), ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil), ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil), diff --git a/errors.toml b/errors.toml index 4bf81f9ef7866..bbc2ff8d78192 100644 --- a/errors.toml +++ b/errors.toml @@ -1646,6 +1646,46 @@ error = ''' transaction aborted because lazy uniqueness check is enabled and an error occurred: %s ''' +["executor:8154"] +error = ''' +Don't support load data from tidb-server's disk. Or if you want to load local data via client, the path of INFILE '%s' needs to specify the clause of LOCAL first +''' + +["executor:8155"] +error = ''' +Do not support loading parquet files from local. Please try to load the parquet files from the cloud storage +''' + +["executor:8156"] +error = ''' +The value of INFILE must not be empty when LOAD DATA from LOCAL +''' + +["executor:8157"] +error = ''' +The FORMAT '%s' is not supported +''' + +["executor:8158"] +error = ''' +The URI of INFILE is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}' +''' + +["executor:8159"] +error = ''' +Access to the source file has been denied. Please check the URI, access key and secret access key are correct +''' + +["executor:8160"] +error = ''' +Failed to read source files. Reason: %s. %s +''' + +["executor:8162"] +error = ''' + +''' + ["executor:8212"] error = ''' Failed to split region ranges: %s diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 52d3fc5986b4b..4393a91b32e64 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -347,6 +347,7 @@ go_test( flaky = True, shard_count = 50, deps = [ + "//br/pkg/errors", "//config", "//ddl", "//ddl/placement", diff --git a/executor/builder.go b/executor/builder.go index 18ab4913aadd5..b38e5d85148bb 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -938,46 +938,23 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor { b.err = errors.Errorf("Can not get table %d", v.Table.TableInfo.ID) return nil } - insertVal := &InsertValues{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()), - Table: tbl, - Columns: v.Columns, - GenExprs: v.GenCols.Exprs, - isLoadData: true, - txnInUse: sync.Mutex{}, - } - loadDataInfo := &LoadDataInfo{ - row: make([]types.Datum, 0, len(insertVal.insertColumns)), - InsertValues: insertVal, - Path: v.Path, - Format: v.Format, - Table: tbl, - FieldsInfo: v.FieldsInfo, - LinesInfo: v.LinesInfo, - NullInfo: v.NullInfo, - IgnoreLines: v.IgnoreLines, - ColumnAssignments: v.ColumnAssignments, - ColumnsAndUserVars: v.ColumnsAndUserVars, - OnDuplicate: v.OnDuplicate, - Ctx: b.ctx, - } - columnNames := loadDataInfo.initFieldMappings() - err := loadDataInfo.initLoadColumns(columnNames) + if !tbl.Meta().IsBaseTable() { + b.err = plannercore.ErrNonUpdatableTable.GenWithStackByArgs(tbl.Meta().Name.O, "LOAD") + return nil + } + + worker, err := NewLoadDataWorker(b.ctx, v, tbl) if err != nil { b.err = err return nil } - loadDataExec := &LoadDataExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()), - FileLocRef: v.FileLocRef, - OnDuplicate: v.OnDuplicate, - loadDataInfo: loadDataInfo, - } - var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr - loadDataExec.loadDataInfo.initQueues() - loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt) - return loadDataExec + return &LoadDataExec{ + baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()), + FileLocRef: v.FileLocRef, + OnDuplicate: v.OnDuplicate, + loadDataWorker: worker, + } } func (b *executorBuilder) buildLoadStats(v *plannercore.LoadStats) Executor { diff --git a/executor/errors.go b/executor/errors.go index 565a712d1c7d9..45c6916bbe146 100644 --- a/executor/errors.go +++ b/executor/errors.go @@ -76,4 +76,13 @@ var ( errUnsupportedFlashbackTmpTable = dbterror.ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message("Recover/flashback table is not supported on temporary tables", nil)) errTruncateWrongInsertValue = dbterror.ClassTable.NewStdErr(mysql.ErrTruncatedWrongValue, parser_mysql.Message("Incorrect %-.32s value: '%-.128s' for column '%.192s' at row %d", nil)) ErrExistsInHistoryPassword = dbterror.ClassExecutor.NewStd(mysql.ErrExistsInHistoryPassword) + + ErrLoadDataFromServerDisk = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataFromServerDisk) + ErrLoadParquetFromLocal = dbterror.ClassExecutor.NewStd(mysql.ErrLoadParquetFromLocal) + ErrLoadDataEmptyPath = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataEmptyPath) + ErrLoadDataUnsupportedFormat = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataUnsupportedFormat) + ErrLoadDataInvalidURI = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidURI) + ErrLoadDataCantAccess = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantAccess) + ErrLoadDataCantRead = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantRead) + ErrLoadDataWrongFormatConfig = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataWrongFormatConfig) ) diff --git a/executor/load_data.go b/executor/load_data.go index 5d206bd0149bf..2720d182d6b69 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/table" @@ -43,12 +44,13 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) const ( // LoadDataFormatSQLDump represents the data source file of LOAD DATA is // mydumper-format DML file - LoadDataFormatSQLDump = "sqldumpfile" + LoadDataFormatSQLDump = "sql file" // LoadDataFormatParquet represents the data source file of LOAD DATA is // parquet LoadDataFormatParquet = "parquet" @@ -64,59 +66,56 @@ var ( type LoadDataExec struct { baseExecutor - FileLocRef ast.FileLocRefTp - OnDuplicate ast.OnDuplicateKeyHandlingType - loadDataInfo *LoadDataInfo + FileLocRef ast.FileLocRefTp + OnDuplicate ast.OnDuplicateKeyHandlingType + loadDataWorker *LoadDataWorker } // Next implements the Executor Next interface. func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error { req.GrowAndReset(e.maxChunkSize) - if e.loadDataInfo.Path == "" { - return errors.New("Load Data: infile path is empty") - } - if !e.loadDataInfo.Table.Meta().IsBaseTable() { - return errors.New("can only load data into base tables") + if e.loadDataWorker.Path == "" { + return ErrLoadDataEmptyPath } // CSV-like - if e.loadDataInfo.Format == "" { - if e.loadDataInfo.NullInfo != nil && e.loadDataInfo.NullInfo.OptEnclosed && - (e.loadDataInfo.FieldsInfo == nil || e.loadDataInfo.FieldsInfo.Enclosed == nil) { - return errors.New("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED") + if e.loadDataWorker.format == "" { + if e.loadDataWorker.NullInfo != nil && e.loadDataWorker.NullInfo.OptEnclosed && + (e.loadDataWorker.FieldsInfo == nil || e.loadDataWorker.FieldsInfo.Enclosed == nil) { + return ErrLoadDataWrongFormatConfig.GenWithStackByArgs("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED") } // TODO: support lines terminated is "". - if len(e.loadDataInfo.LinesInfo.Terminated) == 0 { - return errors.New("Load Data: don't support load data terminated is nil") + if len(e.loadDataWorker.LinesInfo.Terminated) == 0 { + return ErrLoadDataWrongFormatConfig.GenWithStackByArgs("LINES TERMINATED BY is empty") } } switch e.FileLocRef { case ast.FileLocServerOrRemote: - u, err := storage.ParseRawURL(e.loadDataInfo.Path) + u, err := storage.ParseRawURL(e.loadDataWorker.Path) if err != nil { - return err + return ErrLoadDataInvalidURI.GenWithStackByArgs(err.Error()) } var filename string u.Path, filename = filepath.Split(u.Path) b, err := storage.ParseBackendFromURL(u, nil) if err != nil { - return err + return ErrLoadDataInvalidURI.GenWithStackByArgs(getMsgFromBRError(err)) } if b.GetLocal() != nil { - return errors.Errorf("Load Data: don't support load data from tidb-server's disk") + return ErrLoadDataFromServerDisk.GenWithStackByArgs(e.loadDataWorker.Path) } return e.loadFromRemote(ctx, b, filename) case ast.FileLocClient: // let caller use handleQuerySpecial to read data in this connection - sctx := e.loadDataInfo.ctx + sctx := e.loadDataWorker.ctx val := sctx.Value(LoadDataVarKey) if val != nil { sctx.SetValue(LoadDataVarKey, nil) - return errors.New("Load Data: previous load data option wasn't closed normally") + return errors.New("previous load data option wasn't closed normally") } - sctx.SetValue(LoadDataVarKey, e.loadDataInfo) + sctx.SetValue(LoadDataVarKey, e.loadDataWorker) } return nil } @@ -132,36 +131,36 @@ func (e *LoadDataExec) loadFromRemote( } s, err := storage.New(ctx, b, opt) if err != nil { - return err + return ErrLoadDataCantAccess } fileReader, err := s.Open(ctx, filename) if err != nil { - return err + return ErrLoadDataCantRead.GenWithStackByArgs(getMsgFromBRError(err), "Please check the INFILE path is correct") } defer fileReader.Close() - e.loadDataInfo.loadRemoteInfo = loadRemoteInfo{ + e.loadDataWorker.loadRemoteInfo = loadRemoteInfo{ store: s, path: filename, } - return e.loadDataInfo.Load(ctx, fileReader) + return e.loadDataWorker.Load(ctx, fileReader) } // Close implements the Executor Close interface. func (e *LoadDataExec) Close() error { - if e.runtimeStats != nil && e.loadDataInfo != nil && e.loadDataInfo.stats != nil { - defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.loadDataInfo.stats) + if e.runtimeStats != nil && e.loadDataWorker != nil && e.loadDataWorker.stats != nil { + defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.loadDataWorker.stats) } return nil } // Open implements the Executor Open interface. func (e *LoadDataExec) Open(ctx context.Context) error { - if e.loadDataInfo.insertColumns != nil { - e.loadDataInfo.initEvalBuffer() + if e.loadDataWorker.insertColumns != nil { + e.loadDataWorker.initEvalBuffer() } // Init for runtime stats. - e.loadDataInfo.collectRuntimeStatsEnabled() + e.loadDataWorker.collectRuntimeStatsEnabled() return nil } @@ -176,33 +175,69 @@ type loadRemoteInfo struct { path string } -// LoadDataInfo saves the information of loading data operation. -// TODO: rename it and remove unnecessary public methods. -type LoadDataInfo struct { +// LoadDataWorker does a LOAD DATA job. +type LoadDataWorker struct { *InsertValues - row []types.Datum - Path string - Format string - Table table.Table + Path string + Ctx sessionctx.Context + // expose some fields for test FieldsInfo *ast.FieldsClause LinesInfo *ast.LinesClause NullInfo *ast.NullDefinedBy IgnoreLines uint64 - Ctx sessionctx.Context - rows [][]types.Datum - Drained bool - ColumnAssignments []*ast.Assignment - ColumnsAndUserVars []*ast.ColumnNameOrUserVar - FieldMappings []*FieldMapping + format string + columnAssignments []*ast.Assignment + columnsAndUserVars []*ast.ColumnNameOrUserVar + fieldMappings []*FieldMapping + onDuplicate ast.OnDuplicateKeyHandlingType + table table.Table + row []types.Datum + rows [][]types.Datum commitTaskQueue chan commitTask - StopCh chan struct{} - QuitCh chan struct{} - OnDuplicate ast.OnDuplicateKeyHandlingType - - loadRemoteInfo loadRemoteInfo + loadRemoteInfo loadRemoteInfo +} + +// NewLoadDataWorker creates a new LoadDataWorker that is ready to work. +func NewLoadDataWorker( + sctx sessionctx.Context, + plan *plannercore.LoadData, + tbl table.Table, +) (*LoadDataWorker, error) { + insertVal := &InsertValues{ + baseExecutor: newBaseExecutor(sctx, nil, plan.ID()), + Table: tbl, + Columns: plan.Columns, + GenExprs: plan.GenCols.Exprs, + isLoadData: true, + txnInUse: sync.Mutex{}, + maxRowsInBatch: uint64(sctx.GetSessionVars().DMLBatchSize), + } + loadDataWorker := &LoadDataWorker{ + row: make([]types.Datum, 0, len(insertVal.insertColumns)), + commitTaskQueue: make(chan commitTask, taskQueueSize), + InsertValues: insertVal, + Path: plan.Path, + format: plan.Format, + table: tbl, + FieldsInfo: plan.FieldsInfo, + LinesInfo: plan.LinesInfo, + NullInfo: plan.NullInfo, + IgnoreLines: plan.IgnoreLines, + columnAssignments: plan.ColumnAssignments, + columnsAndUserVars: plan.ColumnsAndUserVars, + onDuplicate: plan.OnDuplicate, + Ctx: sctx, + } + columnNames := loadDataWorker.initFieldMappings() + err := loadDataWorker.initLoadColumns(columnNames) + if err != nil { + return nil, err + } + loadDataWorker.ResetBatch() + return loadDataWorker, nil } // FieldMapping indicates the relationship between input field and table column or user variable @@ -211,152 +246,59 @@ type FieldMapping struct { UserVar *ast.VariableExpr } -// Load reads from readerFn and do load data job. -func (e *LoadDataInfo) Load(ctx context.Context, reader io.ReadSeekCloser) error { - e.initQueues() - e.SetMaxRowsInBatch(uint64(e.Ctx.GetSessionVars().DMLBatchSize)) - e.startStopWatcher() - // let stop watcher goroutine quit - defer e.forceQuit() - err := sessiontxn.NewTxn(ctx, e.Ctx) - if err != nil { - return err - } - // processStream process input data, enqueue commit task - wg := new(sync.WaitGroup) - wg.Add(1) - go processStream(ctx, reader, e, wg) - err = e.commitWork(ctx) - wg.Wait() - return err -} +// initFieldMappings make a field mapping slice to implicitly map input field to table column or user defined variable +// the slice's order is the same as the order of the input fields. +// Returns a slice of same ordered column names without user defined variable names. +func (e *LoadDataWorker) initFieldMappings() []string { + columns := make([]string, 0, len(e.columnsAndUserVars)+len(e.columnAssignments)) + tableCols := e.table.Cols() -// processStream process input stream from network -func processStream(ctx context.Context, reader io.ReadSeekCloser, loadDataInfo *LoadDataInfo, wg *sync.WaitGroup) { - var ( - parser mydump.Parser - err error - ) - defer func() { - r := recover() - if r != nil { - logutil.Logger(ctx).Error("process routine panicked", - zap.Reflect("r", r), - zap.Stack("stack")) - } - if err != nil { - logutil.Logger(ctx).Error("process routine meet error", - zap.Error(err)) - } - if err != nil || r != nil { - loadDataInfo.forceQuit() - } else { - loadDataInfo.CloseTaskQueue() + if len(e.columnsAndUserVars) == 0 { + for _, v := range tableCols { + fieldMapping := &FieldMapping{ + Column: v, + } + e.fieldMappings = append(e.fieldMappings, fieldMapping) + columns = append(columns, v.Name.O) } - wg.Done() - }() - switch strings.ToLower(loadDataInfo.Format) { - case "": - // CSV-like - parser, err = mydump.NewCSVParser( - ctx, - loadDataInfo.GenerateCSVConfig(), - reader, - int64(config.ReadBlockSize), - nil, - false, - // TODO: support charset conversion - nil) - case LoadDataFormatSQLDump: - parser = mydump.NewChunkParser( - ctx, - loadDataInfo.Ctx.GetSessionVars().SQLMode, - reader, - int64(config.ReadBlockSize), - nil, - ) - case LoadDataFormatParquet: - if loadDataInfo.loadRemoteInfo.store == nil { - err = errors.New("parquet format requires remote storage") - return - } - parser, err = mydump.NewParquetParser( - ctx, - loadDataInfo.loadRemoteInfo.store, - reader, - loadDataInfo.loadRemoteInfo.path, - ) - default: - err = errors.Errorf("unsupported format: %s", loadDataInfo.Format) - } - if err != nil { - return + return columns } - parser.SetLogger(log.Logger{Logger: logutil.Logger(ctx)}) + var column *table.Column - for { - // prepare batch and enqueue task - err = loadDataInfo.ReadRows(ctx, parser) - if err != nil { - logutil.Logger(ctx).Error("load data process stream error in ReadRows", zap.Error(err)) - return - } - if loadDataInfo.curBatchCnt == 0 { - return - } - if err = loadDataInfo.enqOneTask(ctx); err != nil { - logutil.Logger(ctx).Error("load data process stream error in enqOneTask", zap.Error(err)) - return + for _, v := range e.columnsAndUserVars { + if v.ColumnName != nil { + column = table.FindCol(tableCols, v.ColumnName.Name.O) + columns = append(columns, v.ColumnName.Name.O) + } else { + column = nil } - } -} -// reorderColumns reorder the e.insertColumns according to the order of columnNames -// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name. -func (e *LoadDataInfo) reorderColumns(columnNames []string) error { - cols := e.insertColumns - - if len(cols) != len(columnNames) { - return ErrColumnsNotMatched - } - - reorderedColumns := make([]*table.Column, len(cols)) - - if columnNames == nil { - return nil - } - - mapping := make(map[string]int) - for idx, colName := range columnNames { - mapping[strings.ToLower(colName)] = idx - } - - for _, col := range cols { - idx := mapping[col.Name.L] - reorderedColumns[idx] = col + fieldMapping := &FieldMapping{ + Column: column, + UserVar: v.UserVar, + } + e.fieldMappings = append(e.fieldMappings, fieldMapping) } - e.insertColumns = reorderedColumns - - return nil + return columns } // initLoadColumns sets columns which the input fields loaded to. -func (e *LoadDataInfo) initLoadColumns(columnNames []string) error { +func (e *LoadDataWorker) initLoadColumns(columnNames []string) error { var cols []*table.Column var missingColName string var err error - tableCols := e.Table.Cols() + tableCols := e.table.Cols() if len(columnNames) != len(tableCols) { - for _, v := range e.ColumnAssignments { + for _, v := range e.columnAssignments { columnNames = append(columnNames, v.Column.Name.O) } } - cols, missingColName = table.FindCols(tableCols, columnNames, e.Table.Meta().PKIsHandle) + cols, missingColName = table.FindCols(tableCols, columnNames, e.table.Meta().PKIsHandle) if missingColName != "" { return dbterror.ErrBadField.GenWithStackByArgs(missingColName, "field list") } @@ -390,104 +332,189 @@ func (e *LoadDataInfo) initLoadColumns(columnNames []string) error { return nil } -// initFieldMappings make a field mapping slice to implicitly map input field to table column or user defined variable -// the slice's order is the same as the order of the input fields. -// Returns a slice of same ordered column names without user defined variable names. -func (e *LoadDataInfo) initFieldMappings() []string { - columns := make([]string, 0, len(e.ColumnsAndUserVars)+len(e.ColumnAssignments)) - tableCols := e.Table.Cols() - - if len(e.ColumnsAndUserVars) == 0 { - for _, v := range tableCols { - fieldMapping := &FieldMapping{ - Column: v, - } - e.FieldMappings = append(e.FieldMappings, fieldMapping) - columns = append(columns, v.Name.O) - } +// reorderColumns reorder the e.insertColumns according to the order of columnNames +// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name. +func (e *LoadDataWorker) reorderColumns(columnNames []string) error { + cols := e.insertColumns - return columns + if len(cols) != len(columnNames) { + return ErrColumnsNotMatched } - var column *table.Column + reorderedColumns := make([]*table.Column, len(cols)) - for _, v := range e.ColumnsAndUserVars { - if v.ColumnName != nil { - column = table.FindCol(tableCols, v.ColumnName.Name.O) - columns = append(columns, v.ColumnName.Name.O) - } else { - column = nil - } + if columnNames == nil { + return nil + } - fieldMapping := &FieldMapping{ - Column: column, - UserVar: v.UserVar, - } - e.FieldMappings = append(e.FieldMappings, fieldMapping) + mapping := make(map[string]int) + for idx, colName := range columnNames { + mapping[strings.ToLower(colName)] = idx } - return columns -} + for _, col := range cols { + idx := mapping[col.Name.L] + reorderedColumns[idx] = col + } -// GetRows getter for rows -func (e *LoadDataInfo) GetRows() [][]types.Datum { - return e.rows -} + e.insertColumns = reorderedColumns -// GetCurBatchCnt getter for curBatchCnt -func (e *LoadDataInfo) GetCurBatchCnt() uint64 { - return e.curBatchCnt + return nil } -// CloseTaskQueue preparing routine to inform commit routine no more data -func (e *LoadDataInfo) CloseTaskQueue() { - close(e.commitTaskQueue) -} +// Load reads from readerFn and do load data job. +func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) error { + var ( + parser mydump.Parser + err error + ) -// initQueues initialize task queue and error report queue -func (e *LoadDataInfo) initQueues() { - e.commitTaskQueue = make(chan commitTask, taskQueueSize) - e.StopCh = make(chan struct{}, 2) - e.QuitCh = make(chan struct{}) + switch strings.ToLower(e.format) { + case "": + // CSV-like + parser, err = mydump.NewCSVParser( + ctx, + e.GenerateCSVConfig(), + reader, + int64(config.ReadBlockSize), + nil, + false, + // TODO: support charset conversion + nil) + case LoadDataFormatSQLDump: + parser = mydump.NewChunkParser( + ctx, + e.Ctx.GetSessionVars().SQLMode, + reader, + int64(config.ReadBlockSize), + nil, + ) + case LoadDataFormatParquet: + if e.loadRemoteInfo.store == nil { + return ErrLoadParquetFromLocal + } + parser, err = mydump.NewParquetParser( + ctx, + e.loadRemoteInfo.store, + reader, + e.loadRemoteInfo.path, + ) + default: + return ErrLoadDataUnsupportedFormat.GenWithStackByArgs(e.format) + } + if err != nil { + return ErrLoadDataWrongFormatConfig.GenWithStack(err.Error()) + } + parser.SetLogger(log.Logger{Logger: logutil.Logger(ctx)}) + + err = sessiontxn.NewTxn(ctx, e.Ctx) + if err != nil { + return err + } + group, groupCtx := errgroup.WithContext(ctx) + + // processStream process input data, enqueue commit task + group.Go(func() error { + return e.processStream(groupCtx, parser) + }) + group.Go(func() error { + return e.commitWork(groupCtx) + }) + + err = group.Wait() + e.SetMessage() + return err } -// startStopWatcher monitor StopCh to force quit -func (e *LoadDataInfo) startStopWatcher() { - go func() { - <-e.StopCh - close(e.QuitCh) +// processStream process input stream from network +func (e *LoadDataWorker) processStream( + ctx context.Context, + parser mydump.Parser, +) (err error) { + defer func() { + r := recover() + if r != nil { + logutil.Logger(ctx).Error("process routine panicked", + zap.Reflect("r", r), + zap.Stack("stack")) + err = errors.Errorf("%v", r) + } }() -} -// forceQuit let commit quit directly -func (e *LoadDataInfo) forceQuit() { - e.StopCh <- struct{}{} + checkKilled := time.NewTicker(30 * time.Second) + defer checkKilled.Stop() + + for { + // prepare batch and enqueue task + if err = e.ReadRows(ctx, parser); err != nil { + return + } + if e.curBatchCnt == 0 { + close(e.commitTaskQueue) + return + } + + TrySendTask: + select { + case <-ctx.Done(): + return ctx.Err() + case <-checkKilled.C: + if atomic.CompareAndSwapUint32(&e.Ctx.GetSessionVars().Killed, 1, 0) { + logutil.Logger(ctx).Info("load data query interrupted quit data processing") + close(e.commitTaskQueue) + return ErrQueryInterrupted + } + goto TrySendTask + case e.commitTaskQueue <- commitTask{e.curBatchCnt, e.rows}: + } + // reset rows buffer, will reallocate buffer but NOT reuse + e.ResetBatch() + } } -// makeCommitTask produce commit task with data in LoadDataInfo.rows LoadDataInfo.curBatchCnt -func (e *LoadDataInfo) makeCommitTask() commitTask { - return commitTask{e.curBatchCnt, e.rows} +// ResetBatch reset the inner batch. +func (e *LoadDataWorker) ResetBatch() { + e.rows = make([][]types.Datum, 0, e.maxRowsInBatch) + e.curBatchCnt = 0 } -// enqOneTask feed one batch commit task to commit work -func (e *LoadDataInfo) enqOneTask(ctx context.Context) error { - var err error - if e.curBatchCnt > 0 { +// commitWork commit batch sequentially +func (e *LoadDataWorker) commitWork(ctx context.Context) (err error) { + defer func() { + r := recover() + if r != nil { + logutil.Logger(ctx).Error("commitWork panicked", + zap.Reflect("r", r), + zap.Stack("stack")) + err = errors.Errorf("%v", r) + } + }() + + var tasks uint64 + for { select { - case e.commitTaskQueue <- e.makeCommitTask(): - case <-e.QuitCh: - err = errors.New("enqOneTask forced to quit") - logutil.Logger(ctx).Error("enqOneTask forced to quit, possible commitWork error") - return err + case <-ctx.Done(): + return ctx.Err() + case task, ok := <-e.commitTaskQueue: + if !ok { + return nil + } + start := time.Now() + if err = e.commitOneTask(ctx, task); err != nil { + return err + } + tasks++ + logutil.Logger(ctx).Info("commit one task success", + zap.Duration("commit time usage", time.Since(start)), + zap.Uint64("keys processed", task.cnt), + zap.Uint64("tasks processed", tasks), + zap.Int("tasks in queue", len(e.commitTaskQueue))) } - // reset rows buffer, will reallocate buffer but NOT reuse - e.SetMaxRowsInBatch(e.maxRowsInBatch) } - return err } -// CommitOneTask insert Data from LoadDataInfo.rows, then make commit and refresh txn -func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task commitTask) error { +// commitOneTask insert Data from LoadDataWorker.rows, then make commit and refresh txn +func (e *LoadDataWorker) commitOneTask(ctx context.Context, task commitTask) error { var err error defer func() { if err != nil { @@ -514,66 +541,44 @@ func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task commitTask) error return err } -// commitWork commit batch sequentially -func (e *LoadDataInfo) commitWork(ctx context.Context) error { +// CheckAndInsertOneBatch is used to commit one transaction batch fulfilled data +func (e *LoadDataWorker) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error { + if e.stats != nil && e.stats.BasicRuntimeStats != nil { + // Since this method will not call by executor Next, + // so we need record the basic executor runtime stats by ourselves. + start := time.Now() + defer func() { + e.stats.BasicRuntimeStats.Record(time.Since(start), 0) + }() + } var err error - defer func() { - r := recover() - if r != nil { - logutil.Logger(ctx).Error("commitWork panicked", - zap.Reflect("r", r), - zap.Stack("stack")) - } - if err != nil || r != nil { - e.forceQuit() - } - if err != nil { - e.ctx.StmtRollback(ctx, false) - } - }() - var tasks uint64 - var end = false - for !end { - select { - case <-e.QuitCh: - err = errors.New("commit forced to quit") - logutil.Logger(ctx).Error("commit forced to quit, possible preparation failed") - return err - case commitTask, ok := <-e.commitTaskQueue: - if ok { - start := time.Now() - err = e.CommitOneTask(ctx, commitTask) - if err != nil { - break - } - tasks++ - logutil.Logger(ctx).Info("commit one task success", - zap.Duration("commit time usage", time.Since(start)), - zap.Uint64("keys processed", commitTask.cnt), - zap.Uint64("tasks processed", tasks), - zap.Int("tasks in queue", len(e.commitTaskQueue))) - } else { - end = true - } - } - if err != nil { - logutil.Logger(ctx).Error("load data commit work error", zap.Error(err)) - break - } - if atomic.CompareAndSwapUint32(&e.Ctx.GetSessionVars().Killed, 1, 0) { - logutil.Logger(ctx).Info("load data query interrupted quit data processing") - err = ErrQueryInterrupted - break - } + if cnt == 0 { + return err + } + e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt) + + replace := false + if e.onDuplicate == ast.OnDuplicateKeyHandlingReplace { + replace = true + } + + err = e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD, replace) + if err != nil { + return err } return err } -// SetMaxRowsInBatch sets the max number of rows to insert in a batch. -func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) { - e.maxRowsInBatch = limit - e.rows = make([][]types.Datum, 0, limit) - e.curBatchCnt = 0 +func (e *LoadDataWorker) addRecordLD(ctx context.Context, row []types.Datum) error { + if row == nil { + return nil + } + err := e.addRecord(ctx, row) + if err != nil { + e.handleWarning(err) + return err + } + return nil } // ReadRows reads rows from parser. When parser's reader meet EOF, it will return @@ -581,7 +586,7 @@ func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) { // will also return nil. // The result rows are saved in e.rows and update some members, caller can check // if curBatchCnt == 0 to know if reached EOF. -func (e *LoadDataInfo) ReadRows(ctx context.Context, parser mydump.Parser) error { +func (e *LoadDataWorker) ReadRows(ctx context.Context, parser mydump.Parser) error { ignoreOneLineFn := parser.ReadRow if csvParser, ok := parser.(*mydump.CSVParser); ok { ignoreOneLineFn = func() error { @@ -606,7 +611,10 @@ func (e *LoadDataInfo) ReadRows(ctx context.Context, parser mydump.Parser) error if errors.Cause(err) == io.EOF { return nil } - return err + return ErrLoadDataCantRead.GenWithStackByArgs( + err.Error(), + "Only the following formats delimited text file (csv, tsv), parquet, sql are supported. Please provide the valid source file(s)", + ) } // rowCount will be used in fillRow(), last insert ID will be assigned according to the rowCount = 1. // So should add first here. @@ -621,48 +629,8 @@ func (e *LoadDataInfo) ReadRows(ctx context.Context, parser mydump.Parser) error } } -// CheckAndInsertOneBatch is used to commit one transaction batch full filled data -func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error { - if e.stats != nil && e.stats.BasicRuntimeStats != nil { - // Since this method will not call by executor Next, - // so we need record the basic executor runtime stats by ourself. - start := time.Now() - defer func() { - e.stats.BasicRuntimeStats.Record(time.Since(start), 0) - }() - } - var err error - if cnt == 0 { - return err - } - e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt) - - replace := false - if e.OnDuplicate == ast.OnDuplicateKeyHandlingReplace { - replace = true - } - - err = e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD, replace) - if err != nil { - return err - } - return err -} - -// SetMessage sets info message(ERR_LOAD_INFO) generated by LOAD statement, it is public because of the special way that -// LOAD statement is handled. -func (e *LoadDataInfo) SetMessage() { - stmtCtx := e.ctx.GetSessionVars().StmtCtx - numRecords := stmtCtx.RecordRows() - numDeletes := stmtCtx.DeletedRows() - numSkipped := numRecords - stmtCtx.CopiedRows() - numWarnings := stmtCtx.WarningCount() - msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings) - e.ctx.GetSessionVars().StmtCtx.SetMessage(msg) -} - // colsToRow encodes the data of parser output. -func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []types.Datum) []types.Datum { +func (e *LoadDataWorker) colsToRow(ctx context.Context, cols []types.Datum) []types.Datum { row := make([]types.Datum, 0, len(e.insertColumns)) sessionVars := e.Ctx.GetSessionVars() setVar := func(name string, col *types.Datum) { @@ -676,16 +644,16 @@ func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []types.Datum) []type } } - for i := 0; i < len(e.FieldMappings); i++ { + for i := 0; i < len(e.fieldMappings); i++ { if i >= len(cols) { - if e.FieldMappings[i].Column == nil { - setVar(e.FieldMappings[i].UserVar.Name, nil) + if e.fieldMappings[i].Column == nil { + setVar(e.fieldMappings[i].UserVar.Name, nil) continue } // If some columns is missing and their type is time and has not null flag, they should be set as current time. - if types.IsTypeTime(e.FieldMappings[i].Column.GetType()) && mysql.HasNotNullFlag(e.FieldMappings[i].Column.GetFlag()) { - row = append(row, types.NewTimeDatum(types.CurrentTime(e.FieldMappings[i].Column.GetType()))) + if types.IsTypeTime(e.fieldMappings[i].Column.GetType()) && mysql.HasNotNullFlag(e.fieldMappings[i].Column.GetFlag()) { + row = append(row, types.NewTimeDatum(types.CurrentTime(e.fieldMappings[i].Column.GetType()))) continue } @@ -693,8 +661,8 @@ func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []types.Datum) []type continue } - if e.FieldMappings[i].Column == nil { - setVar(e.FieldMappings[i].UserVar.Name, &cols[i]) + if e.fieldMappings[i].Column == nil { + setVar(e.fieldMappings[i].UserVar.Name, &cols[i]) continue } @@ -705,9 +673,9 @@ func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []types.Datum) []type row = append(row, cols[i]) } - for i := 0; i < len(e.ColumnAssignments); i++ { + for i := 0; i < len(e.columnAssignments); i++ { // eval expression of `SET` clause - d, err := expression.EvalAstExpr(e.Ctx, e.ColumnAssignments[i].Expr) + d, err := expression.EvalAstExpr(e.Ctx, e.columnAssignments[i].Expr) if err != nil { e.handleWarning(err) return nil @@ -725,20 +693,20 @@ func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []types.Datum) []type return newRow } -func (e *LoadDataInfo) addRecordLD(ctx context.Context, row []types.Datum) error { - if row == nil { - return nil - } - err := e.addRecord(ctx, row) - if err != nil { - e.handleWarning(err) - return err - } - return nil +// SetMessage sets info message(ERR_LOAD_INFO) generated by LOAD statement, it is public because of the special way that +// LOAD statement is handled. +func (e *LoadDataWorker) SetMessage() { + stmtCtx := e.ctx.GetSessionVars().StmtCtx + numRecords := stmtCtx.RecordRows() + numDeletes := stmtCtx.DeletedRows() + numSkipped := numRecords - stmtCtx.CopiedRows() + numWarnings := stmtCtx.WarningCount() + msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings) + e.ctx.GetSessionVars().StmtCtx.SetMessage(msg) } -// GenerateCSVConfig generates a CSV config for parser from LoadDataInfo. -func (e *LoadDataInfo) GenerateCSVConfig() *config.CSVConfig { +// GenerateCSVConfig generates a CSV config for parser from LoadDataWorker. +func (e *LoadDataWorker) GenerateCSVConfig() *config.CSVConfig { var ( nullDef []string quotedNullIsText = true @@ -780,6 +748,16 @@ func (e *LoadDataInfo) GenerateCSVConfig() *config.CSVConfig { } } +// GetRows getter for rows +func (e *LoadDataWorker) GetRows() [][]types.Datum { + return e.rows +} + +// GetCurBatchCnt getter for curBatchCnt +func (e *LoadDataWorker) GetCurBatchCnt() uint64 { + return e.curBatchCnt +} + var _ io.ReadSeekCloser = (*SimpleSeekerOnReadCloser)(nil) // SimpleSeekerOnReadCloser provides Seek(0, SeekCurrent) on ReadCloser. diff --git a/executor/loadremotetest/BUILD.bazel b/executor/loadremotetest/BUILD.bazel index 57ddea3ad19ea..aa5279fac652c 100644 --- a/executor/loadremotetest/BUILD.bazel +++ b/executor/loadremotetest/BUILD.bazel @@ -4,6 +4,7 @@ go_test( name = "loadremotetest_test", timeout = "short", srcs = [ + "error_test.go", "main_test.go", "one_csv_test.go", "one_parquet_test.go", @@ -15,8 +16,11 @@ go_test( deps = [ "//executor", "//kv", + "//parser/terror", "//testkit", "@com_github_fsouza_fake_gcs_server//fakestorage", + "@com_github_pingcap_errors//:errors", + "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", "@org_uber_go_goleak//:goleak", ], diff --git a/executor/loadremotetest/error_test.go b/executor/loadremotetest/error_test.go new file mode 100644 index 0000000000000..c47ae6f6764be --- /dev/null +++ b/executor/loadremotetest/error_test.go @@ -0,0 +1,117 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package loadremotetest + +import ( + "fmt" + "testing" + + "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/terror" + "github.com/stretchr/testify/require" +) + +func checkClientErrorMessage(t *testing.T, err error, msg string) { + require.Error(t, err) + cause := errors.Cause(err) + terr, ok := cause.(*errors.Error) + require.True(t, ok, "%T", cause) + require.Contains(t, terror.ToSQLError(terr).Error(), msg) +} + +func (s *mockGCSSuite) TestErrorMessage() { + s.tk.MustExec("DROP DATABASE IF EXISTS load_csv;") + + err := s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t") + checkClientErrorMessage(s.T(), err, "ERROR 1046 (3D000): No database selected") + err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE wrongdb.t") + checkClientErrorMessage(s.T(), err, "ERROR 1146 (42S02): Table 'wrongdb.t' doesn't exist") + + s.tk.MustExec("CREATE DATABASE load_csv;") + s.tk.MustExec("USE load_csv;") + s.tk.MustExec("CREATE TABLE t (i INT PRIMARY KEY, s varchar(32));") + + err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t (wrong)") + checkClientErrorMessage(s.T(), err, "ERROR 1054 (42S22): Unknown column 'wrong' in 'field list'") + // This behaviour is different from MySQL + err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t (i,i)") + checkClientErrorMessage(s.T(), err, "ERROR 1110 (42000): Column 'i' specified twice") + err = s.tk.ExecToErr("LOAD DATA INFILE 'gs://1' INTO TABLE t (@v) SET wrong=@v") + checkClientErrorMessage(s.T(), err, "ERROR 1054 (42S22): Unknown column 'wrong' in 'field list'") + err = s.tk.ExecToErr("LOAD DATA INFILE 'abc://1' INTO TABLE t;") + checkClientErrorMessage(s.T(), err, + "ERROR 8158 (HY000): The URI of INFILE is invalid. Reason: storage abc not support yet. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'") + err = s.tk.ExecToErr("LOAD DATA INFILE 's3://no-network' INTO TABLE t;") + checkClientErrorMessage(s.T(), err, + "ERROR 8159 (HY000): Access to the source file has been denied. Please check the URI, access key and secret access key are correct") + err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://wrong-bucket/p?endpoint=%s' + INTO TABLE t;`, gcsEndpoint)) + checkClientErrorMessage(s.T(), err, + "ERROR 8160 (HY000): Failed to read source files. Reason: failed to read gcs file, file info: input.bucket='wrong-bucket', input.key='p'. Please check the INFILE path is correct") + + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "test-tsv", + Name: "t.tsv", + }, + Content: []byte("1\t2\n" + + "1\t4\n"), + }) + err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + FORMAT '123' INTO TABLE t;`, gcsEndpoint)) + checkClientErrorMessage(s.T(), err, + "ERROR 8157 (HY000): The FORMAT '123' is not supported") + err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + FORMAT 'sql file' INTO TABLE t;`, gcsEndpoint)) + checkClientErrorMessage(s.T(), err, + "ERROR 8160 (HY000): Failed to read source files. Reason: syntax error: unexpected Integer (1) at offset 1, expecting start of row. Only the following formats delimited text file (csv, tsv), parquet, sql are supported. Please provide the valid source file(s)") + err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + INTO TABLE t LINES STARTING BY '\n';`, gcsEndpoint)) + checkClientErrorMessage(s.T(), err, + `ERROR 8162 (HY000): STARTING BY ' +' cannot contain LINES TERMINATED BY ' +'`) + + // TODO: fix these tests + //s.tk.MustExec("CREATE TABLE t2 (c1 INT, c2 INT, c3 INT);") + //err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + // INTO TABLE t2;`, gcsEndpoint)) + //checkClientErrorMessage(s.T(), err, + // "ERROR 1261 (01000): Row 1 doesn't contain data for all columns") + //s.tk.MustExec("CREATE TABLE t3 (c1 INT);") + //err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + // INTO TABLE t3;`, gcsEndpoint)) + //checkClientErrorMessage(s.T(), err, + // "ERROR 1262 (01000): Row 1 was truncated; it contained more data than there were input columns") + + // TODO: don't use batchCheckAndInsert, mimic (*InsertExec).exec() + + //err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' + // INTO TABLE t;`, gcsEndpoint)) + //checkClientErrorMessage(s.T(), err, "ERROR 1062 (23000): Duplicate entry '1' for key 'PRIMARY'") + + //s.server.CreateObject(fakestorage.Object{ + // ObjectAttrs: fakestorage.ObjectAttrs{ + // BucketName: "test-tsv", + // Name: "t2.tsv", + // }, + // Content: []byte("null\t2\n" + + // "3\t4\n"), + //}) + //err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t2.tsv?endpoint=%s' + // INTO TABLE t NULL DEFINED BY 'null';`, gcsEndpoint)) + //checkClientErrorMessage(s.T(), err, "ERROR 8154 (HY000): LOAD DATA raises error(s): xxx") +} diff --git a/executor/loadremotetest/one_csv_test.go b/executor/loadremotetest/one_csv_test.go index ff9fd277f7dfe..cc98e6ff68af8 100644 --- a/executor/loadremotetest/one_csv_test.go +++ b/executor/loadremotetest/one_csv_test.go @@ -81,7 +81,7 @@ func (s *mockGCSSuite) TestLoadCSV() { // can't read file at tidb-server sql = "LOAD DATA INFILE '/etc/passwd' INTO TABLE load_csv.t;" - s.tk.MustContainErrMsg(sql, "don't support load data from tidb-server") + s.tk.MustContainErrMsg(sql, "Don't support load data from tidb-server's disk. Or if you want to load local data via client, the path of INFILE '/etc/passwd' needs to specify the clause of LOCAL first") } func (s *mockGCSSuite) TestIgnoreNLines() { diff --git a/executor/loadremotetest/one_sqldump_test.go b/executor/loadremotetest/one_sqldump_test.go index ec320787e3b05..e2755c8f0d74b 100644 --- a/executor/loadremotetest/one_sqldump_test.go +++ b/executor/loadremotetest/one_sqldump_test.go @@ -36,7 +36,7 @@ func (s *mockGCSSuite) TestLoadSQLDump() { }) sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-load-parquet/p?endpoint=%s' - FORMAT 'SQLDumpFile' INTO TABLE load_csv.t;`, gcsEndpoint) + FORMAT 'SQL file' INTO TABLE load_csv.t;`, gcsEndpoint) s.tk.MustExec(sql) s.tk.MustQuery("SELECT * FROM load_csv.t;").Check(testkit.Rows( "1 a", diff --git a/executor/utils.go b/executor/utils.go index 47fe32a93aa68..4997eb9d277a8 100644 --- a/executor/utils.go +++ b/executor/utils.go @@ -16,6 +16,8 @@ package executor import ( "strings" + + "github.com/pingcap/errors" ) // SetFromString constructs a slice of strings from a comma separated string. @@ -92,3 +94,20 @@ func (b *batchRetrieverHelper) nextBatch(retrieveRange func(start, end int) erro } return nil } + +// TODO: add GetMsg() to errors package to replace this function. +// see TestGetMsgFromBRError for more details. +func getMsgFromBRError(err error) string { + if err == nil { + return "" + } + if berr, ok := err.(*errors.Error); ok { + return berr.GetMsg() + } + raw := err.Error() + berrMsg := errors.Cause(err).Error() + if len(raw) <= len(berrMsg)+len(": ") { + return raw + } + return raw[:len(raw)-len(berrMsg)-len(": ")] +} diff --git a/executor/utils_test.go b/executor/utils_test.go index 3c8a32de25cc5..e0795228141a5 100644 --- a/executor/utils_test.go +++ b/executor/utils_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/pingcap/errors" + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/stretchr/testify/require" ) @@ -93,3 +94,12 @@ func TestBatchRetrieverHelper(t *testing.T) { require.Equal(t, rangeStarts, []int{0}) require.Equal(t, rangeEnds, []int{10}) } + +func TestGetMsgFromBRError(t *testing.T) { + var berr error = berrors.ErrStorageInvalidConfig + require.Equal(t, "[BR:ExternalStorage:ErrStorageInvalidConfig]invalid external storage config", berr.Error()) + require.Equal(t, "invalid external storage config", getMsgFromBRError(berr)) + berr = errors.Annotatef(berr, "some message about error reason") + require.Equal(t, "some message about error reason: [BR:ExternalStorage:ErrStorageInvalidConfig]invalid external storage config", berr.Error()) + require.Equal(t, "some message about error reason", getMsgFromBRError(berr)) +} diff --git a/executor/writetest/write_test.go b/executor/writetest/write_test.go index 021e678708817..fdd8991a10f2e 100644 --- a/executor/writetest/write_test.go +++ b/executor/writetest/write_test.go @@ -1874,7 +1874,7 @@ type testCase struct { func checkCases( tests []testCase, - ld *executor.LoadDataInfo, + ld *executor.LoadDataWorker, t *testing.T, tk *testkit.TestKit, ctx sessionctx.Context, @@ -1903,7 +1903,7 @@ func checkCases( require.NoError(t, err1) err1 = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt()) require.NoError(t, err1) - ld.SetMaxRowsInBatch(20000) + ld.ResetBatch() ld.SetMessage() require.Equal(t, tt.expectedMsg, tk.Session().LastMessage()) ctx.StmtCommit(context.Background()) @@ -1924,7 +1924,7 @@ func TestLoadDataMissingColumn(t *testing.T) { tk.MustExec(createSQL) tk.MustExec("load data local infile '/tmp/nonexistence.csv' ignore into table load_data_missing") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -1974,7 +1974,7 @@ func TestIssue18681(t *testing.T) { tk.MustExec(createSQL) tk.MustExec("load data local infile '/tmp/nonexistence.csv' ignore into table load_data_test") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -1983,7 +1983,7 @@ func TestIssue18681(t *testing.T) { selectSQL := "select bin(a), bin(b), bin(c), bin(d) from load_data_test;" ctx.GetSessionVars().StmtCtx.DupKeyAsWarning = true ctx.GetSessionVars().StmtCtx.BadNullAsWarning = true - ld.SetMaxRowsInBatch(20000) + ld.ResetBatch() sc := ctx.GetSessionVars().StmtCtx originIgnoreTruncate := sc.IgnoreTruncate @@ -2026,7 +2026,7 @@ func TestIssue34358(t *testing.T) { tk.MustExec("create table load_data_test (a varchar(10), b varchar(10))") tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test ( @v1, @v2 ) set a = @v1, b = @v2") - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) require.NotNil(t, ld) checkCases([]testCase{ @@ -2048,7 +2048,7 @@ func TestLoadData(t *testing.T) { require.Error(t, err) tk.MustExec("load data local infile '/tmp/nonexistence.csv' ignore into table load_data_test") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2070,7 +2070,7 @@ func TestLoadData(t *testing.T) { require.NoError(t, err) err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt()) require.NoError(t, err) - ld.SetMaxRowsInBatch(20000) + ld.ResetBatch() r := tk.MustQuery(selectSQL) r.Check(nil) @@ -2229,7 +2229,7 @@ func TestLoadDataEscape(t *testing.T) { tk.MustExec("CREATE TABLE load_data_test (id INT NOT NULL PRIMARY KEY, value TEXT NOT NULL) CHARACTER SET utf8") tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2260,7 +2260,7 @@ func TestLoadDataSpecifiedColumns(t *testing.T) { tk.MustExec(`create table load_data_test (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 varchar(255) default "def", c3 int default 0);`) tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test (c1, c2)") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2286,7 +2286,7 @@ func TestLoadDataIgnoreLines(t *testing.T) { tk.MustExec("CREATE TABLE load_data_test (id INT NOT NULL PRIMARY KEY, value TEXT NOT NULL) CHARACTER SET utf8") tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test ignore 1 lines") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2311,7 +2311,7 @@ func TestLoadDataNULL(t *testing.T) { tk.MustExec(`load data local infile '/tmp/nonexistence.csv' into table load_data_test FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n';`) ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2338,7 +2338,7 @@ func TestLoadDataReplace(t *testing.T) { tk.MustExec("INSERT INTO load_data_replace VALUES(1,'val 1'),(2,'val 2')") tk.MustExec("LOAD DATA LOCAL INFILE '/tmp/nonexistence.csv' REPLACE INTO TABLE load_data_replace") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2359,7 +2359,7 @@ func TestLoadDataOverflowBigintUnsigned(t *testing.T) { tk.MustExec("CREATE TABLE load_data_test (a bigint unsigned);") tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2380,7 +2380,7 @@ func TestLoadDataWithUppercaseUserVars(t *testing.T) { tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test (@V1)" + " set a = @V1, b = @V1*100") ctx := tk.Session().(sessionctx.Context) - ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.True(t, ok) defer ctx.SetValue(executor.LoadDataVarKey, nil) require.NotNil(t, ld) @@ -2402,7 +2402,7 @@ func TestLoadDataIntoPartitionedTable(t *testing.T) { "partition p2 values less than (11))") tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table range_t fields terminated by ','") ctx := tk.Session().(sessionctx.Context) - ld := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + ld := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker) require.Nil(t, sessiontxn.NewTxn(context.Background(), ctx)) parser, err := mydump.NewCSVParser( @@ -2419,7 +2419,7 @@ func TestLoadDataIntoPartitionedTable(t *testing.T) { require.NoError(t, err) err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt()) require.NoError(t, err) - ld.SetMaxRowsInBatch(20000) + ld.ResetBatch() ld.SetMessage() ctx.StmtCommit(context.Background()) txn, err := ctx.Txn(true) diff --git a/server/conn.go b/server/conn.go index be27865cb6ed5..8de0dbd1287b7 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1616,23 +1616,29 @@ func (cc *clientConn) writeReq(ctx context.Context, filePath string) error { // handleLoadData does the additional work after processing the 'load data' query. // It sends client a file path, then reads the file content from client, inserts data into database. -func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error { +func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *executor.LoadDataWorker) error { // If the server handles the load data request, the client has to set the ClientLocalFiles capability. if cc.capability&mysql.ClientLocalFiles == 0 { return errNotAllowedCommand } - if loadDataInfo == nil { + if loadDataWorker == nil { return errors.New("load data info is empty") } - err := cc.writeReq(ctx, loadDataInfo.Path) + err := cc.writeReq(ctx, loadDataWorker.Path) if err != nil { return err } - // use Pipe to convert cc.readPacket to io.Reader - r, w := io.Pipe() + var ( + // use Pipe to convert cc.readPacket to io.Reader + r, w = io.Pipe() + drained bool + wg sync.WaitGroup + ) + wg.Add(1) go func() { + defer wg.Done() //nolint: errcheck defer w.Close() @@ -1649,7 +1655,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor } // https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query_response_local_infile_request.html if len(data) == 0 { - loadDataInfo.Drained = true + drained = true return } } @@ -1663,16 +1669,19 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor } }() - err = loadDataInfo.Load(ctx, executor.NewSimpleSeekerOnReadCloser(r)) + err = loadDataWorker.Load(ctx, executor.NewSimpleSeekerOnReadCloser(r)) + _ = r.Close() + wg.Wait() + if err != nil { - if !loadDataInfo.Drained { + if !drained { logutil.Logger(ctx).Info("not drained yet, try reading left data from client connection") } // drain the data from client conn util empty packet received, otherwise the connection will be reset // https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query_response_local_infile_request.html - for !loadDataInfo.Drained { + for !drained { // check kill flag again, let the draining loop could quit if empty packet could not be received - if atomic.CompareAndSwapUint32(&loadDataInfo.Ctx.GetSessionVars().Killed, 1, 0) { + if atomic.CompareAndSwapUint32(&loadDataWorker.Ctx.GetSessionVars().Killed, 1, 0) { logutil.Logger(ctx).Warn("receiving kill, stop draining data, connection may be reset") return executor.ErrQueryInterrupted } @@ -1682,13 +1691,12 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor break } if len(curData) == 0 { - loadDataInfo.Drained = true + drained = true logutil.Logger(ctx).Info("draining finished for error", zap.Error(err)) break } } } - loadDataInfo.SetMessage() return err } @@ -2083,7 +2091,7 @@ func (cc *clientConn) handleFileTransInConn(ctx context.Context, status uint16) handled = true defer cc.ctx.SetValue(executor.LoadDataVarKey, nil) //nolint:forcetypeassert - if err := cc.handleLoadData(ctx, loadDataInfo.(*executor.LoadDataInfo)); err != nil { + if err := cc.handleLoadData(ctx, loadDataInfo.(*executor.LoadDataWorker)); err != nil { return handled, err } } diff --git a/server/server_test.go b/server/server_test.go index 583cee98341fd..4e52453885b27 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1037,10 +1037,10 @@ func (cli *testServerClient) runTestLoadData(t *testing.T, server *Server) { // can't insert into views (in TiDB) or sequences. issue #20880 _, err = dbt.GetDB().Exec(fmt.Sprintf("load data local infile %q into table v1", path)) require.Error(t, err) - require.Equal(t, "Error 1105 (HY000): can only load data into base tables", err.Error()) + require.Equal(t, "Error 1288 (HY000): The target table v1 of the LOAD is not updatable", err.Error()) _, err = dbt.GetDB().Exec(fmt.Sprintf("load data local infile %q into table s1", path)) require.Error(t, err) - require.Equal(t, "Error 1105 (HY000): can only load data into base tables", err.Error()) + require.Equal(t, "Error 1288 (HY000): The target table s1 of the LOAD is not updatable", err.Error()) rs, err1 := dbt.GetDB().Exec(fmt.Sprintf("load data local infile %q into table test", path)) require.NoError(t, err1)