diff --git a/errno/errcode.go b/errno/errcode.go index 5712e713a58ac..9e5bd187875b6 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1061,7 +1061,7 @@ const ( ErrLoadDataUnsupportedOption = 8166 ErrLoadDataJobNotFound = 8170 ErrLoadDataInvalidOperation = 8171 - ErrLoadDataCantDetachWithLocal = 8172 + ErrLoadDataLocalUnsupportedOption = 8172 // Error codes used by TiDB ddl package ErrUnsupportedDDLOperation = 8200 diff --git a/errno/errname.go b/errno/errname.go index 6b58fc4b8e594..c338fbe733de9 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1054,7 +1054,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrLoadDataUnsupportedOption: mysql.Message("Unsupported option %s for %s import mode", nil), ErrLoadDataJobNotFound: mysql.Message("Job ID %d doesn't exist", nil), ErrLoadDataInvalidOperation: mysql.Message("The current job status cannot perform the operation. %s", nil), - ErrLoadDataCantDetachWithLocal: mysql.Message("The job can not be DETACHED when LOAD DATA LOCAL INFILE", nil), + ErrLoadDataLocalUnsupportedOption: mysql.Message("Unsupported option for LOAD DATA LOCAL INFILE: %s", nil), ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil), ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil), diff --git a/errors.toml b/errors.toml index 5a6dee6111c3a..06816c4bce249 100644 --- a/errors.toml +++ b/errors.toml @@ -1753,7 +1753,7 @@ The current job status cannot perform the operation. %s ["executor:8172"] error = ''' -The job can not be DETACHED when LOAD DATA LOCAL INFILE +Unsupported option for LOAD DATA LOCAL INFILE: %s ''' ["executor:8212"] diff --git a/executor/asyncloaddata/BUILD.bazel b/executor/asyncloaddata/BUILD.bazel index a084d50343ea4..ca11b966c0f87 100644 --- a/executor/asyncloaddata/BUILD.bazel +++ b/executor/asyncloaddata/BUILD.bazel @@ -14,10 +14,13 @@ go_library( "//types", "//util/chunk", "//util/dbterror/exeerrors", + "//util/logutil", "//util/sqlexec", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_tikv_client_go_v2//util", "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", ], ) diff --git a/executor/asyncloaddata/operate_test.go b/executor/asyncloaddata/operate_test.go index 279117e695362..d88804b63a476 100644 --- a/executor/asyncloaddata/operate_test.go +++ b/executor/asyncloaddata/operate_test.go @@ -44,8 +44,8 @@ func (s *mockGCSSuite) TestOperateRunningJob() { HeartBeatInSec = backup }) - s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCreateLoadDataJob", `sleep(1000)`) - s.enableFailpoint("github.com/pingcap/tidb/executor/AfterStartJob", `sleep(1000)`) + s.enableFailpoint("github.com/pingcap/tidb/executor/asyncloaddata/AfterCreateLoadDataJob", `sleep(1000)`) + s.enableFailpoint("github.com/pingcap/tidb/executor/asyncloaddata/AfterStartJob", `sleep(1000)`) s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCommitOneTask", `sleep(1000)`) sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-operate/t.tsv?endpoint=%s' INTO TABLE test_operate.t WITH batch_size = 1;`, gcsEndpoint) diff --git a/executor/asyncloaddata/progress.go b/executor/asyncloaddata/progress.go index db0e8d3dfb460..c017332cc3332 100644 --- a/executor/asyncloaddata/progress.go +++ b/executor/asyncloaddata/progress.go @@ -24,7 +24,7 @@ import ( type Progress struct { // SourceFileSize is the size of the source file in bytes. When we can't get // the size of the source file, it will be set to -1. - // Currently the value is read by seek(0, end), when LOAD DATA LOCAL we wrap + // Currently, the value is read by seek(0, end), when LOAD DATA LOCAL we wrap // SimpleSeekerOnReadCloser on MySQL client connection which doesn't support // it. SourceFileSize int64 @@ -52,7 +52,7 @@ func (p *Progress) String() string { return string(bs) } -// ProgressFromJSON creates a Progress from a JSON string. +// ProgressFromJSON creates Progress from a JSON string. func ProgressFromJSON(bs []byte) (*Progress, error) { var p Progress err := json.Unmarshal(bs, &p) diff --git a/executor/asyncloaddata/show_test.go b/executor/asyncloaddata/show_test.go index 3e4087de677e8..370404d160f02 100644 --- a/executor/asyncloaddata/show_test.go +++ b/executor/asyncloaddata/show_test.go @@ -277,7 +277,7 @@ func (s *mockGCSSuite) TestInternalStatus() { userStr := tk2.Session().GetSessionVars().User.String() // wait for the load data job to be created - <-executor.TestSyncCh + <-TestSyncCh jobInfos, err := GetAllJobInfo(ctx, tk2.Session(), userStr) require.NoError(s.T(), err) @@ -319,12 +319,18 @@ func (s *mockGCSSuite) TestInternalStatus() { r.checkIgnoreTimes(s.T(), row) // resume the load data job - executor.TestSyncCh <- struct{}{} + TestSyncCh <- struct{}{} // wait for the load data job to be started - <-executor.TestSyncCh + <-TestSyncCh - info, err = GetJobInfo(ctx, tk2.Session(), id, userStr) + job := &Job{ + ID: id, + Conn: tk2.Session(), + User: userStr, + } + + info, err = job.GetJobInfo(ctx) require.NoError(s.T(), err) expected.StartTime = info.StartTime expected.Status = JobRunning @@ -337,20 +343,20 @@ func (s *mockGCSSuite) TestInternalStatus() { r.checkIgnoreTimes(s.T(), row) // resume the load data job - executor.TestSyncCh <- struct{}{} + TestSyncCh <- struct{}{} // wait for the first task to be committed <-executor.TestSyncCh // wait for UpdateJobProgress require.Eventually(s.T(), func() bool { - info, err = GetJobInfo(ctx, tk2.Session(), id, userStr) + info, err = job.GetJobInfo(ctx) if err != nil { return false } return info.Progress == `{"SourceFileSize":2,"LoadedFileSize":1,"LoadedRowCnt":1}` }, 6*time.Second, time.Millisecond*100) - info, err = GetJobInfo(ctx, tk2.Session(), id, userStr) + info, err = job.GetJobInfo(ctx) require.NoError(s.T(), err) expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":1,"LoadedRowCnt":1}` require.Equal(s.T(), expected, info) @@ -370,7 +376,7 @@ func (s *mockGCSSuite) TestInternalStatus() { // wait for UpdateJobProgress require.Eventually(s.T(), func() bool { - info, err = GetJobInfo(ctx, tk2.Session(), id, userStr) + info, err = job.GetJobInfo(ctx) if err != nil { return false } @@ -387,14 +393,14 @@ func (s *mockGCSSuite) TestInternalStatus() { executor.TestSyncCh <- struct{}{} require.Eventually(s.T(), func() bool { - info, err = GetJobInfo(ctx, tk2.Session(), id, userStr) + info, err = job.GetJobInfo(ctx) if err != nil { return false } return info.Status == JobFinished }, 6*time.Second, 100*time.Millisecond) - info, err = GetJobInfo(ctx, tk2.Session(), id, userStr) + info, err = job.GetJobInfo(ctx) require.NoError(s.T(), err) expected.Status = JobFinished expected.EndTime = info.EndTime @@ -427,8 +433,8 @@ func (s *mockGCSSuite) TestInternalStatus() { config.BufferSizeScale = backup3 }) - s.enableFailpoint("github.com/pingcap/tidb/executor/SyncAfterCreateLoadDataJob", `return`) - s.enableFailpoint("github.com/pingcap/tidb/executor/SyncAfterStartJob", `return`) + s.enableFailpoint("github.com/pingcap/tidb/executor/asyncloaddata/SyncAfterCreateLoadDataJob", `return`) + s.enableFailpoint("github.com/pingcap/tidb/executor/asyncloaddata/SyncAfterStartJob", `return`) s.enableFailpoint("github.com/pingcap/tidb/executor/SyncAfterCommitOneTask", `return`) sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t*.tsv?endpoint=%s' INTO TABLE load_tsv.t WITH batch_size = 1;`, gcsEndpoint) diff --git a/executor/asyncloaddata/util.go b/executor/asyncloaddata/util.go index 3a40e993fe372..edea6b2bbaaa2 100644 --- a/executor/asyncloaddata/util.go +++ b/executor/asyncloaddata/util.go @@ -18,17 +18,34 @@ import ( "context" "fmt" "net/url" + "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/dbterror/exeerrors" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/util" + "go.uber.org/zap" ) +// Job import job. +type Job struct { + ID int64 + // Job don't manage the life cycle of the connection. + Conn sqlexec.SQLExecutor + User string +} + +// NewJob returns new Job. +func NewJob(ID int64, conn sqlexec.SQLExecutor, user string) *Job { + return &Job{ID: ID, Conn: conn, User: user} +} + // CreateLoadDataJob creates a load data job by insert a record to system table. // The AUTO_INCREMENT value will be returned as jobID. func CreateLoadDataJob( @@ -37,7 +54,7 @@ func CreateLoadDataJob( dataSource, db, table string, importMode string, user string, -) (int64, error) { +) (*Job, error) { // remove the params in data source URI because it may contains AK/SK u, err := url.Parse(dataSource) if err == nil && u.Scheme != "" { @@ -52,38 +69,52 @@ func CreateLoadDataJob( VALUES (%?, %?, %?, %?, %?);`, dataSource, db, table, importMode, user) if err != nil { - return 0, err + return nil, err } rs, err := conn.ExecuteInternal(ctx, `SELECT LAST_INSERT_ID();`) if err != nil { - return 0, err + return nil, err } //nolint: errcheck defer rs.Close() rows, err := sqlexec.DrainRecordSet(ctx, rs, 1) if err != nil { - return 0, err + return nil, err } if len(rows) != 1 { - return 0, errors.Errorf("unexpected result length: %d", len(rows)) + return nil, errors.Errorf("unexpected result length: %d", len(rows)) } - return rows[0].GetInt64(0), nil + return NewJob(rows[0].GetInt64(0), conn, user), nil } +// TestSyncCh is used in unit test to synchronize the execution of LOAD DATA. +var TestSyncCh = make(chan struct{}) + // StartJob tries to start a not-yet-started job with jobID. It will not return // error when there's no matched job. -func StartJob( - ctx context.Context, - conn sqlexec.SQLExecutor, - jobID int64, -) error { +func (j *Job) StartJob(ctx context.Context) error { + failpoint.Inject("AfterCreateLoadDataJob", nil) + failpoint.Inject("SyncAfterCreateLoadDataJob", func() { + TestSyncCh <- struct{}{} + <-TestSyncCh + }) + ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) - _, err := conn.ExecuteInternal(ctx, + _, err := j.Conn.ExecuteInternal(ctx, `UPDATE mysql.load_data_jobs SET start_time = CURRENT_TIMESTAMP(6), update_time = CURRENT_TIMESTAMP(6) WHERE job_id = %? AND start_time IS NULL AND end_time IS NULL;`, - jobID) - return err + j.ID) + if err != nil { + return err + } + + failpoint.Inject("AfterStartJob", nil) + failpoint.Inject("SyncAfterStartJob", func() { + TestSyncCh <- struct{}{} + <-TestSyncCh + }) + return nil } var ( @@ -101,81 +132,61 @@ var ( // TODO: Currently if the node is crashed after CreateLoadDataJob and before StartJob, // it will always be in the status of pending. Maybe we should unify CreateLoadDataJob // and StartJob. -func UpdateJobProgress( - ctx context.Context, - conn sqlexec.SQLExecutor, - jobID int64, - progress string, -) (bool, error) { +func (j *Job) UpdateJobProgress(ctx context.Context, progress string) (bool, error) { ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) // let TiDB handle heartbeat check for concurrent SQL // we tolerate 2 times of failure/timeout when updating heartbeat - _, err := conn.ExecuteInternal(ctx, + _, err := j.Conn.ExecuteInternal(ctx, `UPDATE mysql.load_data_jobs SET progress = %?, update_time = CURRENT_TIMESTAMP(6) WHERE job_id = %? AND end_time IS NULL AND (update_time >= DATE_SUB(CURRENT_TIMESTAMP(6), INTERVAL %? SECOND) OR update_time IS NULL);`, - progress, jobID, OfflineThresholdInSec) + progress, j.ID, OfflineThresholdInSec) if err != nil { return false, err } - return conn.GetSessionVars().StmtCtx.AffectedRows() == 1, nil + return j.Conn.GetSessionVars().StmtCtx.AffectedRows() == 1, nil } // FinishJob finishes a load data job. A job can only be finished once. -func FinishJob( - ctx context.Context, - conn sqlexec.SQLExecutor, - jobID int64, - result string, -) error { +func (j *Job) FinishJob(ctx context.Context, result string) error { ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) - _, err := conn.ExecuteInternal(ctx, + _, err := j.Conn.ExecuteInternal(ctx, `UPDATE mysql.load_data_jobs SET end_time = CURRENT_TIMESTAMP(6), result_message = %? WHERE job_id = %? AND result_message IS NULL AND error_message IS NULL;`, - result, jobID) + result, j.ID) return err } // FailJob fails a load data job. A job can only be failed once. -func FailJob( - ctx context.Context, - conn sqlexec.SQLExecutor, - jobID int64, - result string, -) error { +func (j *Job) FailJob(ctx context.Context, result string) error { ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) - _, err := conn.ExecuteInternal(ctx, + _, err := j.Conn.ExecuteInternal(ctx, `UPDATE mysql.load_data_jobs SET end_time = CURRENT_TIMESTAMP(6), error_message = %? WHERE job_id = %? AND result_message IS NULL AND error_message IS NULL;`, - result, jobID) + result, j.ID) return err } // CancelJob cancels a load data job. Only a running/paused job can be canceled. -func CancelJob( - ctx context.Context, - conn sqlexec.SQLExecutor, - jobID int64, - user string, -) (err error) { +func (j *Job) CancelJob(ctx context.Context) (err error) { ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) - _, err = conn.ExecuteInternal(ctx, "BEGIN PESSIMISTIC;") + _, err = j.Conn.ExecuteInternal(ctx, "BEGIN PESSIMISTIC;") if err != nil { return err } defer func() { if err != nil { - _, err1 := conn.ExecuteInternal(ctx, "ROLLBACK;") + _, err1 := j.Conn.ExecuteInternal(ctx, "ROLLBACK;") terror.Log(err1) return } - _, err = conn.ExecuteInternal(ctx, "COMMIT;") + _, err = j.Conn.ExecuteInternal(ctx, "COMMIT;") if err != nil { return } @@ -185,10 +196,10 @@ func CancelJob( rs sqlexec.RecordSet rows []chunk.Row ) - rs, err = conn.ExecuteInternal(ctx, + rs, err = j.Conn.ExecuteInternal(ctx, `SELECT expected_status, end_time, error_message FROM mysql.load_data_jobs WHERE job_id = %? AND create_user = %?;`, - jobID, user) + j.ID, j.User) if err != nil { return err } @@ -199,7 +210,7 @@ func CancelJob( } if len(rows) < 1 { - return exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(jobID) + return exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(j.ID) } status := rows[0].GetEnum(0).String() if status != "running" && status != "paused" { @@ -214,37 +225,83 @@ func CancelJob( return exeerrors.ErrLoadDataInvalidOperation.GenWithStackByArgs("need status running or paused, but got finished") } - _, err = conn.ExecuteInternal(ctx, + _, err = j.Conn.ExecuteInternal(ctx, `UPDATE mysql.load_data_jobs SET expected_status = 'canceled', end_time = CURRENT_TIMESTAMP(6), error_message = 'canceled by user' WHERE job_id = %?;`, - jobID) + j.ID) return err } // DropJob drops a load data job. -func DropJob( - ctx context.Context, - conn sqlexec.SQLExecutor, - jobID int64, - user string, -) error { +func (j *Job) DropJob(ctx context.Context) error { ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) - _, err := conn.ExecuteInternal(ctx, + _, err := j.Conn.ExecuteInternal(ctx, `DELETE FROM mysql.load_data_jobs WHERE job_id = %? AND create_user = %?;`, - jobID, user) + j.ID, j.User) if err == nil { return err } - if conn.GetSessionVars().StmtCtx.AffectedRows() < 1 { - return exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(jobID) + if j.Conn.GetSessionVars().StmtCtx.AffectedRows() < 1 { + return exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(j.ID) } return nil } +// OnComplete is called when a job is finished or failed. +func (j *Job) OnComplete(inErr error, msg string) { + // write the ending status even if user context is canceled. + ctx2 := context.Background() + ctx2 = kv.WithInternalSourceType(ctx2, kv.InternalLoadData) + if inErr == nil { + err2 := j.FinishJob(ctx2, msg) + terror.Log(err2) + return + } + errMsg := inErr.Error() + if errImpl, ok := errors.Cause(inErr).(*errors.Error); ok { + b, marshalErr := errImpl.MarshalJSON() + if marshalErr == nil { + errMsg = string(b) + } + } + + err2 := j.FailJob(ctx2, errMsg) + terror.Log(err2) +} + +// ProgressUpdateRoutineFn job progress update routine. +func (j *Job) ProgressUpdateRoutineFn(ctx context.Context, finishCh chan struct{}, errCh <-chan struct{}, progress *Progress) error { + ticker := time.NewTicker(time.Duration(HeartBeatInSec) * time.Second) + defer ticker.Stop() + + for { + select { + case <-finishCh: + // When done, try to update progress to reach 100% + ok, err2 := j.UpdateJobProgress(ctx, progress.String()) + if !ok || err2 != nil { + logutil.Logger(ctx).Warn("failed to update job progress when finished", + zap.Bool("ok", ok), zap.Error(err2)) + } + return nil + case <-errCh: + return nil + case <-ticker.C: + ok, err2 := j.UpdateJobProgress(ctx, progress.String()) + if err2 != nil { + return err2 + } + if !ok { + return errors.Errorf("failed to update job progress, the job %d is interrupted by user or failed to keepalive", j.ID) + } + } + } +} + // JobExpectedStatus is the expected status of a load data job. User can set the // expected status of a job and worker will respect it. type JobExpectedStatus int @@ -328,13 +385,9 @@ func (s JobStatus) String() string { // GetJobStatus gets the status of a load data job. The returned error means // something wrong when querying the database. Other business logic errors are // returned as JobFailed with message. -func GetJobStatus( - ctx context.Context, - conn sqlexec.SQLExecutor, - jobID int64, -) (JobStatus, string, error) { +func (j *Job) GetJobStatus(ctx context.Context) (JobStatus, string, error) { ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) - rs, err := conn.ExecuteInternal(ctx, + rs, err := j.Conn.ExecuteInternal(ctx, `SELECT expected_status, update_time >= DATE_SUB(CURRENT_TIMESTAMP(6), INTERVAL %? SECOND) AS is_alive, @@ -344,7 +397,7 @@ func GetJobStatus( start_time FROM mysql.load_data_jobs WHERE job_id = %?;`, - OfflineThresholdInSec, jobID) + OfflineThresholdInSec, j.ID) if err != nil { return JobFailed, "", err } @@ -354,7 +407,7 @@ func GetJobStatus( return JobFailed, "", err } if len(rows) != 1 { - return JobFailed, exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(jobID).Error(), nil + return JobFailed, exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(j.ID).Error(), nil } return getJobStatus(rows[0]) @@ -422,14 +475,9 @@ type JobInfo struct { } // GetJobInfo gets all needed information of a load data job. -func GetJobInfo( - ctx context.Context, - conn sqlexec.SQLExecutor, - jobID int64, - user string, -) (*JobInfo, error) { +func (j *Job) GetJobInfo(ctx context.Context) (*JobInfo, error) { ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) - rs, err := conn.ExecuteInternal(ctx, + rs, err := j.Conn.ExecuteInternal(ctx, `SELECT expected_status, update_time >= DATE_SUB(CURRENT_TIMESTAMP(6), INTERVAL %? SECOND) AS is_alive, @@ -448,7 +496,7 @@ func GetJobInfo( create_time FROM mysql.load_data_jobs WHERE job_id = %? AND create_user = %?;`, - OfflineThresholdInSec, jobID, user) + OfflineThresholdInSec, j.ID, j.User) if err != nil { return nil, err } @@ -458,7 +506,7 @@ func GetJobInfo( return nil, err } if len(rows) != 1 { - return nil, exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(jobID) + return nil, exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(j.ID) } return getJobInfo(rows[0]) diff --git a/executor/asyncloaddata/util_test.go b/executor/asyncloaddata/util_test.go index b00ee1ee8648c..c58519adc222e 100644 --- a/executor/asyncloaddata/util_test.go +++ b/executor/asyncloaddata/util_test.go @@ -33,13 +33,13 @@ func checkEqualIgnoreTimes(t *testing.T, expected, got *JobInfo) { require.Equal(t, &cloned, got) } -func createJob(t *testing.T, conn sqlexec.SQLExecutor, user string) (int64, *JobInfo) { - id, err := CreateLoadDataJob(context.Background(), conn, "/tmp/test.csv", "test", "t", "logical", user) +func createJob(t *testing.T, conn sqlexec.SQLExecutor, user string) (*Job, *JobInfo) { + job, err := CreateLoadDataJob(context.Background(), conn, "/tmp/test.csv", "test", "t", "logical", user) require.NoError(t, err) - info, err := GetJobInfo(context.Background(), conn, id, user) + info, err := job.GetJobInfo(context.Background()) require.NoError(t, err) expected := &JobInfo{ - JobID: id, + JobID: job.ID, User: user, DataSource: "/tmp/test.csv", TableSchema: "test", @@ -50,7 +50,7 @@ func createJob(t *testing.T, conn sqlexec.SQLExecutor, user string) (int64, *Job StatusMessage: "", } checkEqualIgnoreTimes(t, expected, info) - return id, info + return job, info } func TestHappyPath(t *testing.T) { @@ -60,7 +60,7 @@ func TestHappyPath(t *testing.T) { // job is created - id, expected := createJob(t, tk.Session(), "user") + job, expected := createJob(t, tk.Session(), "user") // job is started by a worker @@ -69,56 +69,56 @@ func TestHappyPath(t *testing.T) { t.Cleanup(func() { OfflineThresholdInSec = backup }) - err := StartJob(ctx, tk.Session(), id) + err := job.StartJob(ctx) require.NoError(t, err) - info, err := GetJobInfo(ctx, tk.Session(), id, "user") + info, err := job.GetJobInfo(ctx) require.NoError(t, err) expected.Status = JobRunning checkEqualIgnoreTimes(t, expected, info) // job is periodically updated by worker - ok, err := UpdateJobProgress(ctx, tk.Session(), id, "imported 10%") + ok, err := job.UpdateJobProgress(ctx, "imported 10%") require.NoError(t, err) require.True(t, ok) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.Progress = "imported 10%" checkEqualIgnoreTimes(t, expected, info) // job is paused - err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedPaused) + err = UpdateJobExpectedStatus(ctx, tk.Session(), job.ID, JobExpectedPaused) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.Status = JobPaused checkEqualIgnoreTimes(t, expected, info) // worker still can update progress, maybe response to pausing is delayed - ok, err = UpdateJobProgress(ctx, tk.Session(), id, "imported 20%") + ok, err = job.UpdateJobProgress(ctx, "imported 20%") require.NoError(t, err) require.True(t, ok) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.Progress = "imported 20%" checkEqualIgnoreTimes(t, expected, info) // job is resumed - err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedRunning) + err = UpdateJobExpectedStatus(ctx, tk.Session(), job.ID, JobExpectedRunning) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.Status = JobRunning checkEqualIgnoreTimes(t, expected, info) // job is finished - err = FinishJob(ctx, tk.Session(), id, "finished message") + err = job.FinishJob(ctx, "finished message") require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.Status = JobFinished expected.StatusMessage = "finished message" @@ -132,7 +132,7 @@ func TestKeepAlive(t *testing.T) { // job is created - id, expected := createJob(t, tk.Session(), "user") + job, expected := createJob(t, tk.Session(), "user") backup := OfflineThresholdInSec OfflineThresholdInSec = 1 @@ -144,13 +144,13 @@ func TestKeepAlive(t *testing.T) { // TODO:👆not correct! time.Sleep(2 * time.Second) - info, err := GetJobInfo(ctx, tk.Session(), id, "user") + info, err := job.GetJobInfo(ctx) require.NoError(t, err) checkEqualIgnoreTimes(t, expected, info) - err = StartJob(ctx, tk.Session(), id) + err = job.StartJob(ctx) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.Status = JobRunning checkEqualIgnoreTimes(t, expected, info) @@ -158,7 +158,7 @@ func TestKeepAlive(t *testing.T) { // if worker failed to keepalive, job will fail time.Sleep(2 * time.Second) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.Status = JobFailed expected.StatusMessage = "job expected running but the node is timeout" @@ -166,31 +166,31 @@ func TestKeepAlive(t *testing.T) { // after the worker is failed to keepalive, further keepalive will fail - ok, err := UpdateJobProgress(ctx, tk.Session(), id, "imported 20%") + ok, err := job.UpdateJobProgress(ctx, "imported 20%") require.NoError(t, err) require.False(t, ok) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) checkEqualIgnoreTimes(t, expected, info) // when worker fails to keepalive, before it calls FailJob, it still can // change expected status to some extent. - err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedPaused) + err = UpdateJobExpectedStatus(ctx, tk.Session(), job.ID, JobExpectedPaused) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.StatusMessage = "job expected paused but the node is timeout" checkEqualIgnoreTimes(t, expected, info) - err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedRunning) + err = UpdateJobExpectedStatus(ctx, tk.Session(), job.ID, JobExpectedRunning) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.StatusMessage = "job expected running but the node is timeout" checkEqualIgnoreTimes(t, expected, info) - err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedCanceled) + err = UpdateJobExpectedStatus(ctx, tk.Session(), job.ID, JobExpectedCanceled) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.Status = JobCanceled expected.StatusMessage = "" @@ -199,9 +199,9 @@ func TestKeepAlive(t *testing.T) { // Now the worker calls FailJob, but the status should still be canceled, // that's more friendly. - err = FailJob(ctx, tk.Session(), id, "failed to keepalive") + err = job.FailJob(ctx, "failed to keepalive") require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.Status = JobCanceled expected.StatusMessage = "failed to keepalive" @@ -215,13 +215,13 @@ func TestJobIsFailedAndGetAllJobs(t *testing.T) { // job is created - id, expected := createJob(t, tk.Session(), "user") + job, expected := createJob(t, tk.Session(), "user") // job can be failed directly when it's pending - err := FailJob(ctx, tk.Session(), id, "failed message") + err := job.FailJob(ctx, "failed message") require.NoError(t, err) - info, err := GetJobInfo(ctx, tk.Session(), id, "user") + info, err := job.GetJobInfo(ctx) require.NoError(t, err) expected.Status = JobFailed expected.StatusMessage = "failed message" @@ -229,18 +229,18 @@ func TestJobIsFailedAndGetAllJobs(t *testing.T) { // create another job and fail it - id, expected = createJob(t, tk.Session(), "user") + job, expected = createJob(t, tk.Session(), "user") - err = StartJob(ctx, tk.Session(), id) + err = job.StartJob(ctx) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.Status = JobRunning checkEqualIgnoreTimes(t, expected, info) - err = FailJob(ctx, tk.Session(), id, "failed message") + err = job.FailJob(ctx, "failed message") require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) expected.Status = JobFailed expected.StatusMessage = "failed message" @@ -248,22 +248,22 @@ func TestJobIsFailedAndGetAllJobs(t *testing.T) { // test change expected status of a failed job. - err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedPaused) + err = UpdateJobExpectedStatus(ctx, tk.Session(), job.ID, JobExpectedPaused) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) checkEqualIgnoreTimes(t, expected, info) - err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedRunning) + err = UpdateJobExpectedStatus(ctx, tk.Session(), job.ID, JobExpectedRunning) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id, "user") + info, err = job.GetJobInfo(ctx) require.NoError(t, err) checkEqualIgnoreTimes(t, expected, info) - err = CancelJob(ctx, tk.Session(), id, "user") + err = job.CancelJob(ctx) require.ErrorContains(t, err, "The current job status cannot perform the operation. need status running or paused, but got failed") // add job of another user and test GetAllJobInfo - _, _ = createJob(t, tk.Session(), "user2") + job, _ = createJob(t, tk.Session(), "user2") jobs, err := GetAllJobInfo(ctx, tk.Session(), "user") require.NoError(t, err) @@ -275,8 +275,10 @@ func TestJobIsFailedAndGetAllJobs(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(jobs)) require.Equal(t, JobPending, jobs[0].Status) + require.Equal(t, job.ID, jobs[0].JobID) - _, err = GetJobInfo(ctx, tk.Session(), jobs[0].JobID, "wrong_user") + job.User = "wrong_user" + _, err = job.GetJobInfo(ctx) require.ErrorContains(t, err, "doesn't exist") } @@ -287,11 +289,11 @@ func TestGetJobStatus(t *testing.T) { // job is created - id, _ := createJob(t, tk.Session(), "user") + job, _ := createJob(t, tk.Session(), "user") // job is pending - status, msg, err := GetJobStatus(ctx, tk.Session(), id) + status, msg, err := job.GetJobStatus(ctx) require.NoError(t, err) require.Equal(t, JobPending, status) require.Equal(t, "", msg) @@ -303,25 +305,26 @@ func TestGetJobStatus(t *testing.T) { t.Cleanup(func() { OfflineThresholdInSec = backup }) - err = StartJob(ctx, tk.Session(), id) + err = job.StartJob(ctx) require.NoError(t, err) - status, msg, err = GetJobStatus(ctx, tk.Session(), id) + status, msg, err = job.GetJobStatus(ctx) require.NoError(t, err) require.Equal(t, JobRunning, status) require.Equal(t, "", msg) // job is finished - err = FinishJob(ctx, tk.Session(), id, "finished message") + err = job.FinishJob(ctx, "finished message") require.NoError(t, err) - status, msg, err = GetJobStatus(ctx, tk.Session(), id) + status, msg, err = job.GetJobStatus(ctx) require.NoError(t, err) require.Equal(t, JobFinished, status) require.Equal(t, "finished message", msg) // wrong ID - status, msg, err = GetJobStatus(ctx, tk.Session(), id+1) + job.ID += 1 + status, msg, err = job.GetJobStatus(ctx) require.NoError(t, err) require.Equal(t, JobFailed, status) require.Contains(t, msg, "doesn't exist") diff --git a/executor/importer/BUILD.bazel b/executor/importer/BUILD.bazel index 33cf742cb6b8e..cd7a653f1e6ca 100644 --- a/executor/importer/BUILD.bazel +++ b/executor/importer/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//br/pkg/lightning/verification", "//br/pkg/storage", "//config", + "//executor/asyncloaddata", "//expression", "//kv", "//meta/autoid", @@ -50,6 +51,7 @@ go_library( "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//tikv", + "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", ], diff --git a/executor/importer/import.go b/executor/importer/import.go index 754127fff2717..62ddfec737fb1 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -31,6 +31,7 @@ import ( litlog "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/executor/asyncloaddata" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" @@ -48,6 +49,7 @@ import ( "github.com/pingcap/tidb/util/stringutil" kvconfig "github.com/tikv/client-go/v2/config" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) const ( @@ -182,6 +184,8 @@ type LoadDataController struct { importantSysVars map[string]string dataStore storage.ExternalStorage dataFiles []*mydump.SourceFileMeta + // total data file size in bytes, only initialized when load from remote. + TotalFileSize int64 } func getImportantSysVars(sctx sessionctx.Context) map[string]string { @@ -245,10 +249,10 @@ func NewLoadDataController(userSctx sessionctx.Context, plan *plannercore.LoadDa charset: charset, importantSysVars: getImportantSysVars(userSctx), } - if err := c.initFieldParams(plan); err != nil { + if err := c.initOptions(userSctx, plan.Options); err != nil { return nil, err } - if err := c.initOptions(userSctx, plan.Options); err != nil { + if err := c.initFieldParams(plan); err != nil { return nil, err } @@ -267,9 +271,17 @@ func (e *LoadDataController) initFieldParams(plan *plannercore.LoadData) error { return exeerrors.ErrLoadDataUnsupportedFormat.GenWithStackByArgs(e.Format) } - if e.FileLocRef == ast.FileLocClient && e.Format == LoadDataFormatParquet { - // parquet parser need seek around, it's not supported for client local file - return exeerrors.ErrLoadParquetFromLocal + if e.FileLocRef == ast.FileLocClient { + if e.Detached { + return exeerrors.ErrLoadDataLocalUnsupportedOption.FastGenByArgs("DETACHED") + } + if e.Format == LoadDataFormatParquet { + // parquet parser need seek around, it's not supported for client local file + return exeerrors.ErrLoadParquetFromLocal + } + if e.ImportMode == PhysicalImportMode { + return exeerrors.ErrLoadDataLocalUnsupportedOption.FastGenByArgs("import_mode='physical'") + } } if e.Format != LoadDataFormatDelimitedData { @@ -654,6 +666,7 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { return exeerrors.ErrLoadDataCantAccess.GenWithStackByArgs(GetMsgFromBRError(err)) } + var totalSize int64 dataFiles := []*mydump.SourceFileMeta{} idx := strings.IndexByte(path, '*') // simple path when the INFILE represent one file @@ -675,6 +688,7 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { FileSize: size, Compression: compressTp, }) + totalSize = size } else { commonPrefix := path[:idx] // we only support '*', in order to reuse glob library manually escape the path @@ -693,6 +707,7 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { FileSize: size, Compression: compressTp, }) + totalSize += size return nil }) if err != nil { @@ -702,6 +717,7 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { e.dataStore = s e.dataFiles = dataFiles + e.TotalFileSize = totalSize return nil } @@ -805,19 +821,6 @@ func (e *LoadDataController) GetParser( return parser, nil } -// PhysicalImport do physical import. -func (e *LoadDataController) PhysicalImport(ctx context.Context) (int64, error) { - // todo: implement job - importer, err := newTableImporter(ctx, e) - if err != nil { - return 0, err - } - defer func() { - _ = importer.Close() - }() - return 0, importer.importTable(ctx) -} - func (e *LoadDataController) toMyDumpFiles() []mydump.FileInfo { tbl := filter.Table{ Schema: e.DBName, @@ -833,6 +836,29 @@ func (e *LoadDataController) toMyDumpFiles() []mydump.FileInfo { return res } +// JobImportParam is the param of the job import. +type JobImportParam struct { + Job *asyncloaddata.Job + Group *errgroup.Group + GroupCtx context.Context + // should be closed in the end of the job. + Done chan struct{} +} + +// JobImporter is the interface for importing a job. +type JobImporter interface { + // Param returns the param of the job import. + Param() *JobImportParam + // Import imports the job. + // import should run in routines using param.Group, when import finished, it should close param.Done. + // during import, we should use param.GroupCtx, so this method has no context param. + Import() + // Result returns the result of the job import. + // todo: return a struct + Result() string + io.Closer +} + // GetMsgFromBRError get msg from BR error. // TODO: add GetMsg() to errors package to replace this function. // see TestGetMsgFromBRError for more details. diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index 90ca76a1df312..0329857161ee9 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -73,7 +73,8 @@ func prepareSortDir(e *LoadDataController) (string, error) { return sortPath, nil } -func newTableImporter(ctx context.Context, e *LoadDataController) (ti *tableImporter, err error) { +// NewTableImporter creates a new table importer. +func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti JobImporter, err error) { idAlloc := kv.NewPanickingAllocators(0) tbl, err := tables.TableFromMeta(idAlloc, e.Table.Meta()) if err != nil { @@ -147,12 +148,13 @@ func newTableImporter(ctx context.Context, e *LoadDataController) (ti *tableImpo // todo: use a real region size getter regionSizeGetter := &local.TableRegionSizeGetterImpl{} - localBackend, err := local.NewBackend(ctx, tls, backendConfig, regionSizeGetter) + localBackend, err := local.NewBackend(param.GroupCtx, tls, backendConfig, regionSizeGetter) if err != nil { return nil, err } return &tableImporter{ + JobImportParam: param, LoadDataController: e, backend: localBackend, tableCp: &checkpoints.TableCheckpoint{ @@ -176,6 +178,7 @@ func newTableImporter(ctx context.Context, e *LoadDataController) (ti *tableImpo } type tableImporter struct { + *JobImportParam *LoadDataController backend *local.Backend tableCp *checkpoints.TableCheckpoint @@ -193,7 +196,25 @@ type tableImporter struct { regionSplitKeys int64 } -var _ io.Closer = &tableImporter{} +var _ JobImporter = &tableImporter{} + +// Param implements JobImporter.Param. +func (ti *tableImporter) Param() *JobImportParam { + return ti.JobImportParam +} + +// Import implements JobImporter.Import. +func (ti *tableImporter) Import() { + ti.Group.Go(func() error { + defer close(ti.Done) + return ti.importTable(ti.GroupCtx) + }) +} + +// Result implements JobImporter.Result. +func (ti *tableImporter) Result() string { + return "" +} func (ti *tableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) { info := LoadDataReaderInfo{ @@ -401,6 +422,7 @@ func (ti *tableImporter) importAndCleanup(ctx context.Context, closedEngine *bac return multierr.Combine(importErr, cleanupErr) } +// Close implements the io.Closer interface. func (ti *tableImporter) Close() error { ti.backend.Close() return nil diff --git a/executor/load_data.go b/executor/load_data.go index 13426f2b4b2e5..df9426c11272d 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/executor/asyncloaddata" "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/expression" @@ -82,10 +83,6 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { e.detachHandled = true } case ast.FileLocClient: - if e.loadDataWorker.controller.Detached { - return exeerrors.ErrLoadDataCantDetachWithLocal - } - // let caller use handleQuerySpecial to read data in this connection sctx := e.loadDataWorker.UserSctx val := sctx.Value(LoadDataVarKey) @@ -106,14 +103,18 @@ type commitTask struct { fileSize int64 } +type planInfo struct { + ID int + Columns []*ast.ColumnName + GenColExprs []expression.Expression +} + // LoadDataWorker does a LOAD DATA job. type LoadDataWorker struct { UserSctx sessionctx.Context - encodeWorkers []*encodeWorker - commitWorkers []*commitWorker - controller *importer.LoadDataController + planInfo planInfo table table.Table progress *asyncloaddata.Progress @@ -142,6 +143,228 @@ func NewLoadDataWorker( setNonRestrictiveFlags(userSctx.GetSessionVars().StmtCtx) } + loadDataWorker := &LoadDataWorker{ + UserSctx: userSctx, + table: tbl, + controller: controller, + planInfo: planInfo{ + ID: plan.ID(), + Columns: plan.Columns, + GenColExprs: plan.GenCols.Exprs, + }, + progress: asyncloaddata.NewProgress(), + } + return loadDataWorker, nil +} + +func (e *LoadDataWorker) loadRemote(ctx context.Context) (int64, error) { + if err2 := e.controller.InitDataFiles(ctx); err2 != nil { + return 0, err2 + } + return e.load(ctx, nil) +} + +// LoadLocal reads from client connection and do load data job. +func (e *LoadDataWorker) LoadLocal(ctx context.Context, r io.ReadCloser) error { + _, err := e.load(ctx, r) + return err +} + +func (e *LoadDataWorker) load(ctx context.Context, r io.ReadCloser) (jboID int64, err error) { + s, err2 := CreateSession(e.UserSctx) + if err2 != nil { + return 0, err2 + } + defer func() { + // if the job is detached and there's no error during init, we will close the session in the detached routine. + // else we close the session here. + if !e.controller.Detached || err != nil { + CloseSession(s) + } + }() + + sqlExec := s.(sqlexec.SQLExecutor) + + job, err2 := asyncloaddata.CreateLoadDataJob( + ctx, + sqlExec, + e.GetInfilePath(), + e.controller.DBName, + e.table.Meta().Name.O, + e.controller.ImportMode, + e.UserSctx.GetSessionVars().User.String(), + ) + if err2 != nil { + return 0, err2 + } + + importCtx := ctx + if e.controller.Detached { + importCtx = context.Background() + importCtx = kv.WithInternalSourceType(importCtx, kv.InternalLoadData) + } + + jobImporter, err2 := e.getJobImporter(importCtx, job, r) + if err2 != nil { + return 0, err2 + } + + if e.controller.Detached { + go func() { + defer CloseSession(s) + // error is stored in system table, so we can ignore it here + //nolint: errcheck + _ = e.importJob(importCtx, jobImporter) + }() + return job.ID, nil + } + return job.ID, e.importJob(importCtx, jobImporter) +} + +func (e *LoadDataWorker) importJob(ctx context.Context, jobImporter importer.JobImporter) (err error) { + defer func() { + _ = jobImporter.Close() + }() + if e.controller.FileLocRef == ast.FileLocServerOrRemote { + e.progress.SourceFileSize = e.controller.TotalFileSize + } + + param := jobImporter.Param() + job, group, groupCtx, done := param.Job, param.Group, param.GroupCtx, param.Done + + var msg string + defer func() { + job.OnComplete(err, msg) + }() + + err = job.StartJob(ctx) + if err != nil { + return err + } + + // UpdateJobProgress goroutine. + group.Go(func() error { + return job.ProgressUpdateRoutineFn(ctx, done, groupCtx.Done(), e.progress) + }) + jobImporter.Import() + err = group.Wait() + msg = jobImporter.Result() + return err +} + +func (e *LoadDataWorker) getJobImporter(ctx context.Context, job *asyncloaddata.Job, r io.ReadCloser) (importer.JobImporter, error) { + group, groupCtx := errgroup.WithContext(ctx) + param := &importer.JobImportParam{ + Job: job, + Group: group, + GroupCtx: groupCtx, + Done: make(chan struct{}), + } + + if e.controller.ImportMode == importer.LogicalImportMode { + return newLogicalJobImporter(param, e, r) + } + return importer.NewTableImporter(param, e.controller) +} + +// GetInfilePath get infile path. +func (e *LoadDataWorker) GetInfilePath() string { + return e.controller.Path +} + +// GetController get load data controller. +// used in unit test. +func (e *LoadDataWorker) GetController() *importer.LoadDataController { + return e.controller +} + +// TestLoad is a helper function for unit test. +func (e *LoadDataWorker) TestLoad(parser mydump.Parser) error { + jobImporter, err2 := newLogicalJobImporter(nil, e, nil) + if err2 != nil { + return err2 + } + err := ResetContextOfStmt(jobImporter.encodeWorkers[0].ctx, &ast.LoadDataStmt{}) + if err != nil { + return err + } + setNonRestrictiveFlags(jobImporter.encodeWorkers[0].ctx.GetSessionVars().StmtCtx) + err = ResetContextOfStmt(jobImporter.commitWorkers[0].ctx, &ast.LoadDataStmt{}) + if err != nil { + return err + } + setNonRestrictiveFlags(jobImporter.commitWorkers[0].ctx.GetSessionVars().StmtCtx) + + ctx := context.Background() + for i := uint64(0); i < jobImporter.controller.IgnoreLines; i++ { + //nolint: errcheck + _ = parser.ReadRow() + } + err = jobImporter.encodeWorkers[0].readOneBatchRows(ctx, parser) + if err != nil { + return err + } + err = sessiontxn.NewTxn(ctx, jobImporter.commitWorkers[0].ctx) + if err != nil { + return err + } + err = jobImporter.commitWorkers[0].checkAndInsertOneBatch( + ctx, + jobImporter.encodeWorkers[0].rows, + jobImporter.encodeWorkers[0].curBatchCnt) + if err != nil { + return err + } + jobImporter.encodeWorkers[0].resetBatch() + jobImporter.commitWorkers[0].ctx.StmtCommit(ctx) + err = jobImporter.commitWorkers[0].ctx.CommitTxn(ctx) + if err != nil { + return err + } + jobImporter.mergeAndSetMessage() + return nil +} + +type logicalJobImporter struct { + *importer.JobImportParam + // only used on interactive load data + userSctx sessionctx.Context + controller *importer.LoadDataController + + encodeWorkers []*encodeWorker + commitWorkers []*commitWorker + readerInfos []importer.LoadDataReaderInfo +} + +var _ importer.JobImporter = &logicalJobImporter{} + +func newLogicalJobImporter(param *importer.JobImportParam, e *LoadDataWorker, r io.ReadCloser) (*logicalJobImporter, error) { + ji := &logicalJobImporter{ + JobImportParam: param, + userSctx: e.UserSctx, + controller: e.controller, + } + compressTp := mydump.ParseCompressionOnFileExtension(e.GetInfilePath()) + compressTp2, err := mydump.ToStorageCompressType(compressTp) + if err != nil { + return nil, err + } + if err := ji.initEncodeCommitWorkers(e); err != nil { + return nil, err + } + if e.controller.FileLocRef == ast.FileLocClient { + ji.readerInfos = []importer.LoadDataReaderInfo{{ + Opener: func(_ context.Context) (io.ReadSeekCloser, error) { + addedSeekReader := NewSimpleSeekerOnReadCloser(r) + return storage.InterceptDecompressReader(addedSeekReader, compressTp2) + }}} + } else { + ji.readerInfos = e.controller.GetLoadDataReaderInfos() + } + return ji, nil +} + +func (ji *logicalJobImporter) initEncodeCommitWorkers(e *LoadDataWorker) (err error) { var createdSessions []sessionctx.Context defer func() { if err != nil { @@ -150,58 +373,44 @@ func NewLoadDataWorker( } } }() - - progress := asyncloaddata.NewProgress() - encodeWorkers := make([]*encodeWorker, 0, controller.ThreadCnt) - commitWorkers := make([]*commitWorker, 0, controller.ThreadCnt) + encodeWorkers := make([]*encodeWorker, 0, e.controller.ThreadCnt) + commitWorkers := make([]*commitWorker, 0, e.controller.ThreadCnt) // TODO: create total ThreadCnt workers rather than 2*ThreadCnt? - for i := int64(0); i < controller.ThreadCnt; i++ { - encodeCore, err2 := createInsertValues(userSctx, plan, tbl, controller) + for i := int64(0); i < e.controller.ThreadCnt; i++ { + encodeCore, err2 := ji.createInsertValues(e) if err2 != nil { - return nil, err2 + return err2 } createdSessions = append(createdSessions, encodeCore.ctx) - commitCore, err2 := createInsertValues(userSctx, plan, tbl, controller) + commitCore, err2 := ji.createInsertValues(e) if err2 != nil { - return nil, err2 + return err2 } createdSessions = append(createdSessions, commitCore.ctx) encode := &encodeWorker{ InsertValues: encodeCore, - controller: controller, - killed: &userSctx.GetSessionVars().Killed, + controller: e.controller, + killed: &e.UserSctx.GetSessionVars().Killed, } encode.resetBatch() encodeWorkers = append(encodeWorkers, encode) commit := &commitWorker{ InsertValues: commitCore, - controller: controller, - progress: progress, + controller: e.controller, + progress: e.progress, } commitWorkers = append(commitWorkers, commit) } - - loadDataWorker := &LoadDataWorker{ - UserSctx: userSctx, - encodeWorkers: encodeWorkers, - commitWorkers: commitWorkers, - table: tbl, - controller: controller, - progress: progress, - } - return loadDataWorker, nil + ji.encodeWorkers = encodeWorkers + ji.commitWorkers = commitWorkers + return nil } // createInsertValues creates InsertValues whose session context is a clone of // userSctx. -func createInsertValues( - userSctx sessionctx.Context, - plan *plannercore.LoadData, - tbl table.Table, - controller *importer.LoadDataController, -) (insertVal *InsertValues, err error) { - sysSession, err2 := CreateSession(userSctx) +func (ji *logicalJobImporter) createInsertValues(e *LoadDataWorker) (insertVal *InsertValues, err error) { + sysSession, err2 := CreateSession(e.UserSctx) if err2 != nil { return nil, err2 } @@ -217,21 +426,21 @@ func createInsertValues( } // copy the related variables to the new session // I have no confident that all needed variables are copied :( - fromVars := userSctx.GetSessionVars() + fromVars := e.UserSctx.GetSessionVars() toVars := sysSession.GetSessionVars() toVars.User = fromVars.User toVars.CurrentDB = fromVars.CurrentDB toVars.SQLMode = fromVars.SQLMode - if !controller.Restrictive { + if !e.controller.Restrictive { setNonRestrictiveFlags(toVars.StmtCtx) } toVars.StmtCtx.InitSQLDigest(fromVars.StmtCtx.SQLDigest()) - insertColumns := controller.InsertColumns + insertColumns := e.controller.InsertColumns hasExtraHandle := false for _, col := range insertColumns { if col.Name.L == model.ExtraHandleName.L { - if !userSctx.GetSessionVars().AllowWriteRowID { + if !e.UserSctx.GetSessionVars().AllowWriteRowID { return nil, errors.Errorf("load data statement for _tidb_rowid are not supported") } hasExtraHandle = true @@ -239,11 +448,11 @@ func createInsertValues( } } ret := &InsertValues{ - baseExecutor: newBaseExecutor(sysSession, nil, plan.ID()), - Table: tbl, - Columns: plan.Columns, - GenExprs: plan.GenCols.Exprs, - maxRowsInBatch: uint64(controller.BatchSize), + baseExecutor: newBaseExecutor(sysSession, nil, e.planInfo.ID), + Table: e.table, + Columns: e.planInfo.Columns, + GenExprs: e.planInfo.GenColExprs, + maxRowsInBatch: uint64(e.controller.BatchSize), insertColumns: insertColumns, rowLen: len(insertColumns), hasExtraHandle: hasExtraHandle, @@ -255,173 +464,25 @@ func createInsertValues( return ret, nil } -func (e *LoadDataWorker) closeWorkerSessions() { - for _, w := range e.encodeWorkers { - CloseSession(w.ctx) - } - for _, w := range e.commitWorkers { - CloseSession(w.ctx) - } -} - -func (e *LoadDataWorker) loadRemote(ctx context.Context) (int64, error) { - if err2 := e.controller.InitDataFiles(ctx); err2 != nil { - e.closeWorkerSessions() - return 0, err2 - } - - if e.controller.ImportMode == importer.PhysicalImportMode { - // will not use session context - e.closeWorkerSessions() - return e.controller.PhysicalImport(ctx) - } - - dataReaderInfos := e.controller.GetLoadDataReaderInfos() - return e.Load(ctx, dataReaderInfos) -} - -// Load reads from readerInfos and do load data job. -func (e *LoadDataWorker) Load( - ctx context.Context, - readerInfos []importer.LoadDataReaderInfo, -) (int64, error) { - var ( - jobID int64 - err error - ) - - s, err := CreateSession(e.UserSctx) - if err != nil { - return 0, err - } - defer CloseSession(s) - - sqlExec := s.(sqlexec.SQLExecutor) - - jobID, err = asyncloaddata.CreateLoadDataJob( - ctx, - sqlExec, - e.GetInfilePath(), - e.controller.DBName, - e.table.Meta().Name.O, - importer.LogicalImportMode, - e.UserSctx.GetSessionVars().User.String(), - ) - if err != nil { - return 0, err - } - - if e.controller.Detached { - go func() { - detachedCtx := context.Background() - detachedCtx = kv.WithInternalSourceType(detachedCtx, kv.InternalLoadData) - // error is stored in system table, so we can ignore it here - //nolint: errcheck - _ = e.doLoad(detachedCtx, readerInfos, jobID) - }() - return jobID, nil - } - return jobID, e.doLoad(ctx, readerInfos, jobID) +func (ji *logicalJobImporter) Param() *importer.JobImportParam { + return ji.JobImportParam } -// TestSyncCh is used in unit test to synchronize the execution of LOAD DATA. -var TestSyncCh = make(chan struct{}) - -func (e *LoadDataWorker) doLoad( - ctx context.Context, - readerInfos []importer.LoadDataReaderInfo, - jobID int64, -) (err error) { - defer func() { - for _, w := range e.encodeWorkers { - w.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(w.id, w.stats) - } - for _, w := range e.commitWorkers { - w.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(w.id, w.stats) - } - e.closeWorkerSessions() - }() - - // get a session for UpdateJobProgress. - s, err := CreateSession(e.UserSctx) - if err != nil { - return err - } - defer CloseSession(s) - - sqlExec := s.(sqlexec.SQLExecutor) - - var msg string - defer func() { - // write the ending status even if user context is canceled. - ctx2 := context.Background() - ctx2 = kv.WithInternalSourceType(ctx2, kv.InternalLoadData) - if err == nil { - err2 := asyncloaddata.FinishJob( - ctx2, - sqlExec, - jobID, - msg) - terror.Log(err2) - return - } - errMsg := err.Error() - if errImpl, ok := errors.Cause(err).(*errors.Error); ok { - b, marshalErr := errImpl.MarshalJSON() - if marshalErr == nil { - errMsg = string(b) - } - } - - err2 := asyncloaddata.FailJob(ctx2, sqlExec, jobID, errMsg) - terror.Log(err2) - }() - - failpoint.Inject("AfterCreateLoadDataJob", nil) - failpoint.Inject("SyncAfterCreateLoadDataJob", func() { - TestSyncCh <- struct{}{} - <-TestSyncCh - }) - - totalFilesize := int64(0) - hasErr := false - for _, readerInfo := range readerInfos { - if readerInfo.Remote == nil { - logutil.Logger(ctx).Warn("can not get total file size when LOAD DATA from local file") - hasErr = true - break - } - totalFilesize += readerInfo.Remote.FileSize - } - if !hasErr { - e.progress.SourceFileSize = totalFilesize - } - - err = asyncloaddata.StartJob(ctx, sqlExec, jobID) - if err != nil { - return err - } - - failpoint.Inject("AfterStartJob", nil) - failpoint.Inject("SyncAfterStartJob", func() { - TestSyncCh <- struct{}{} - <-TestSyncCh - }) - - group, groupCtx := errgroup.WithContext(ctx) +// Import implements importer.JobImporter interface. +func (ji *logicalJobImporter) Import() { // main goroutine -> readerInfoCh -> processOneStream goroutines - readerInfoCh := make(chan importer.LoadDataReaderInfo, e.controller.ThreadCnt) + readerInfoCh := make(chan importer.LoadDataReaderInfo, ji.controller.ThreadCnt) // processOneStream goroutines -> commitTaskCh -> commitWork goroutines commitTaskCh := make(chan commitTask, taskQueueSize) // commitWork goroutines -> done -> UpdateJobProgress goroutine - done := make(chan struct{}) + param := ji.JobImportParam // processOneStream goroutines. - group.Go(func() error { - encodeGroup, encodeCtx := errgroup.WithContext(groupCtx) + param.Group.Go(func() error { + encodeGroup, encodeCtx := errgroup.WithContext(param.GroupCtx) - for i := range e.encodeWorkers { - worker := e.encodeWorkers[i] + for i := range ji.encodeWorkers { + worker := ji.encodeWorkers[i] encodeGroup.Go(func() error { err2 := sessiontxn.NewTxn(encodeCtx, worker.ctx) if err2 != nil { @@ -438,11 +499,11 @@ func (e *LoadDataWorker) doLoad( return err2 }) // commitWork goroutines. - group.Go(func() error { - commitGroup, commitCtx := errgroup.WithContext(groupCtx) + param.Group.Go(func() error { + commitGroup, commitCtx := errgroup.WithContext(param.GroupCtx) - for i := range e.commitWorkers { - worker := e.commitWorkers[i] + for i := range ji.commitWorkers { + worker := ji.commitWorkers[i] commitGroup.Go(func() error { failpoint.Inject("BeforeCommitWork", nil) return worker.commitWork(commitCtx, commitTaskCh) @@ -451,51 +512,97 @@ func (e *LoadDataWorker) doLoad( err2 := commitGroup.Wait() if err2 == nil { - close(done) + close(param.Done) } return err2 }) - // UpdateJobProgress goroutine. - group.Go(func() error { - ticker := time.NewTicker(time.Duration(asyncloaddata.HeartBeatInSec) * time.Second) - defer ticker.Stop() - - for { - select { - case <-done: - // When done, try to update progress to reach 100% - ok, err2 := asyncloaddata.UpdateJobProgress(ctx, sqlExec, jobID, e.progress.String()) - if !ok || err2 != nil { - logutil.Logger(ctx).Warn("failed to update job progress when finished", - zap.Bool("ok", ok), zap.Error(err2)) - } - return nil - case <-groupCtx.Done(): - return nil - case <-ticker.C: - ok, err2 := asyncloaddata.UpdateJobProgress(ctx, sqlExec, jobID, e.progress.String()) - if err2 != nil { - return err2 - } - if !ok { - return errors.Errorf("failed to update job progress, the job %d is interrupted by user or failed to keepalive", jobID) - } - } - } - }) - for i := range readerInfos { + for i := range ji.readerInfos { select { - case <-groupCtx.Done(): - return groupCtx.Err() - case readerInfoCh <- readerInfos[i]: + case <-param.GroupCtx.Done(): + return + case readerInfoCh <- ji.readerInfos[i]: } } close(readerInfoCh) +} - err = group.Wait() - msg = e.mergeAndSetMessage() - return err +// Result implements the importer.JobImporter interface. +func (ji *logicalJobImporter) Result() string { + return ji.mergeAndSetMessage() +} + +// mergeAndSetMessage merges stats from all used session context and sets info +// message(ERR_LOAD_INFO) generated by LOAD statement to UserSctx. +func (ji *logicalJobImporter) mergeAndSetMessage() string { + var ( + numWarnings uint64 + numAffected uint64 + numRecords uint64 + numDeletes uint64 + numSkipped uint64 + ) + + for _, w := range ji.encodeWorkers { + numWarnings += uint64(w.ctx.GetSessionVars().StmtCtx.WarningCount()) + } + for _, w := range ji.commitWorkers { + commitStmtCtx := w.ctx.GetSessionVars().StmtCtx + numWarnings += uint64(commitStmtCtx.WarningCount()) + numAffected += commitStmtCtx.AffectedRows() + numRecords += commitStmtCtx.RecordRows() + numDeletes += commitStmtCtx.DeletedRows() + numSkipped += commitStmtCtx.RecordRows() - commitStmtCtx.CopiedRows() + } + + if numWarnings > math.MaxUint16 { + numWarnings = math.MaxUint16 + } + + msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings) + if !ji.controller.Detached { + userStmtCtx := ji.userSctx.GetSessionVars().StmtCtx + userStmtCtx.SetMessage(msg) + + userStmtCtx.SetAffectedRows(numAffected) + + warns := make([]stmtctx.SQLWarn, numWarnings) + n := 0 + lastInsertID := uint64(0) + for _, w := range ji.encodeWorkers { + n += copy(warns[n:], w.ctx.GetSessionVars().StmtCtx.GetWarnings()) + if w.lastInsertID > lastInsertID { + lastInsertID = w.lastInsertID + } + } + for _, w := range ji.commitWorkers { + n += copy(warns[n:], w.ctx.GetSessionVars().StmtCtx.GetWarnings()) + if w.lastInsertID > lastInsertID { + lastInsertID = w.lastInsertID + } + } + userStmtCtx.SetWarnings(warns) + userStmtCtx.LastInsertID = lastInsertID + } + return msg +} + +// Close implements the importer.JobImporter interface. +func (ji *logicalJobImporter) Close() error { + for _, w := range ji.encodeWorkers { + w.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(w.id, w.stats) + } + for _, w := range ji.commitWorkers { + w.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(w.id, w.stats) + } + + for _, w := range ji.encodeWorkers { + CloseSession(w.ctx) + } + for _, w := range ji.commitWorkers { + CloseSession(w.ctx) + } + return nil } // encodeWorker is a sub-worker of LoadDataWorker that dedicated to encode data. @@ -719,6 +826,9 @@ func (w *encodeWorker) parserData2TableData( return newRow, nil } +// TestSyncCh is used in unit test to synchronize the execution of LOAD DATA. +var TestSyncCh = make(chan struct{}) + // commitWorker is a sub-worker of LoadDataWorker that dedicated to commit data. type commitWorker struct { *InsertValues @@ -861,115 +971,6 @@ func (w *commitWorker) addRecordLD(ctx context.Context, row []types.Datum) error return nil } -// mergeAndSetMessage merges stats from all used session context and sets info -// message(ERR_LOAD_INFO) generated by LOAD statement to UserSctx. -func (e *LoadDataWorker) mergeAndSetMessage() string { - var ( - numWarnings uint64 - numAffected uint64 - numRecords uint64 - numDeletes uint64 - numSkipped uint64 - ) - - for _, w := range e.encodeWorkers { - numWarnings += uint64(w.ctx.GetSessionVars().StmtCtx.WarningCount()) - } - for _, w := range e.commitWorkers { - commitStmtCtx := w.ctx.GetSessionVars().StmtCtx - numWarnings += uint64(commitStmtCtx.WarningCount()) - numAffected += commitStmtCtx.AffectedRows() - numRecords += commitStmtCtx.RecordRows() - numDeletes += commitStmtCtx.DeletedRows() - numSkipped += commitStmtCtx.RecordRows() - commitStmtCtx.CopiedRows() - } - - if numWarnings > math.MaxUint16 { - numWarnings = math.MaxUint16 - } - - msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings) - if !e.controller.Detached { - userStmtCtx := e.UserSctx.GetSessionVars().StmtCtx - userStmtCtx.SetMessage(msg) - - userStmtCtx.SetAffectedRows(numAffected) - - warns := make([]stmtctx.SQLWarn, numWarnings) - n := 0 - lastInsertID := uint64(0) - for _, w := range e.encodeWorkers { - n += copy(warns[n:], w.ctx.GetSessionVars().StmtCtx.GetWarnings()) - if w.lastInsertID > lastInsertID { - lastInsertID = w.lastInsertID - } - } - for _, w := range e.commitWorkers { - n += copy(warns[n:], w.ctx.GetSessionVars().StmtCtx.GetWarnings()) - if w.lastInsertID > lastInsertID { - lastInsertID = w.lastInsertID - } - } - userStmtCtx.SetWarnings(warns) - userStmtCtx.LastInsertID = lastInsertID - } - return msg -} - -// GetInfilePath get infile path. -func (e *LoadDataWorker) GetInfilePath() string { - return e.controller.Path -} - -// GetController get load data controller. -// used in unit test. -func (e *LoadDataWorker) GetController() *importer.LoadDataController { - return e.controller -} - -// TestLoad is a helper function for unit test. -func (e *LoadDataWorker) TestLoad(parser mydump.Parser) error { - err := ResetContextOfStmt(e.encodeWorkers[0].ctx, &ast.LoadDataStmt{}) - if err != nil { - return err - } - setNonRestrictiveFlags(e.encodeWorkers[0].ctx.GetSessionVars().StmtCtx) - err = ResetContextOfStmt(e.commitWorkers[0].ctx, &ast.LoadDataStmt{}) - if err != nil { - return err - } - setNonRestrictiveFlags(e.commitWorkers[0].ctx.GetSessionVars().StmtCtx) - - ctx := context.Background() - for i := uint64(0); i < e.controller.IgnoreLines; i++ { - //nolint: errcheck - _ = parser.ReadRow() - } - err = e.encodeWorkers[0].readOneBatchRows(ctx, parser) - if err != nil { - return err - } - err = sessiontxn.NewTxn(ctx, e.commitWorkers[0].ctx) - if err != nil { - return err - } - err = e.commitWorkers[0].checkAndInsertOneBatch( - ctx, - e.encodeWorkers[0].rows, - e.encodeWorkers[0].curBatchCnt) - if err != nil { - return err - } - e.encodeWorkers[0].resetBatch() - e.commitWorkers[0].ctx.StmtCommit(ctx) - err = e.commitWorkers[0].ctx.CommitTxn(ctx) - if err != nil { - return err - } - e.mergeAndSetMessage() - return nil -} - var _ io.ReadSeekCloser = (*SimpleSeekerOnReadCloser)(nil) // SimpleSeekerOnReadCloser provides Seek(0, SeekCurrent) on ReadCloser. @@ -1031,12 +1032,13 @@ type LoadDataActionExec struct { func (e *LoadDataActionExec) Next(ctx context.Context, _ *chunk.Chunk) error { sqlExec := e.ctx.(sqlexec.SQLExecutor) user := e.ctx.GetSessionVars().User.String() + job := asyncloaddata.NewJob(e.jobID, sqlExec, user) switch e.tp { case ast.LoadDataCancel: - return asyncloaddata.CancelJob(ctx, sqlExec, e.jobID, user) + return job.CancelJob(ctx) case ast.LoadDataDrop: - return asyncloaddata.DropJob(ctx, sqlExec, e.jobID, user) + return job.DropJob(ctx) default: return errors.Errorf("not implemented LOAD DATA action %v", e.tp) } diff --git a/executor/loaddatatest/load_data_test.go b/executor/loaddatatest/load_data_test.go index 7552b40a49605..726ea51bf4b9f 100644 --- a/executor/loaddatatest/load_data_test.go +++ b/executor/loaddatatest/load_data_test.go @@ -77,6 +77,10 @@ func TestLoadDataInitParam(t *testing.T) { exeerrors.ErrLoadDataUnsupportedFormat) require.ErrorIs(t, tk.ExecToErr("load data local infile '/a' format 'parquet' into table load_data_test"), exeerrors.ErrLoadParquetFromLocal) + require.ErrorIs(t, tk.ExecToErr("load data local infile '/a' into table load_data_test with detached"), + exeerrors.ErrLoadDataLocalUnsupportedOption) + require.ErrorIs(t, tk.ExecToErr("load data local infile '/a' into table load_data_test with import_mode='physical'"), + exeerrors.ErrLoadDataLocalUnsupportedOption) require.ErrorContains(t, tk.ExecToErr("load data infile '/a' format 'sql file' into table load_data_test fields terminated by 'a'"), "cannot specify FIELDS ... or LINES") require.ErrorContains(t, tk.ExecToErr("load data infile '/a' format 'parquet' into table load_data_test fields terminated by 'a'"), diff --git a/executor/show.go b/executor/show.go index d37ee3a526a77..d4be28dba2112 100644 --- a/executor/show.go +++ b/executor/show.go @@ -2207,7 +2207,8 @@ func (e *ShowExec) fetchShowLoadDataJobs(ctx context.Context) error { } if e.LoadDataJobID != nil { - info, err := asyncloaddata.GetJobInfo(ctx, exec, *e.LoadDataJobID, e.ctx.GetSessionVars().User.String()) + job := asyncloaddata.NewJob(*e.LoadDataJobID, exec, e.ctx.GetSessionVars().User.String()) + info, err := job.GetJobInfo(ctx) if err != nil { return err } diff --git a/server/BUILD.bazel b/server/BUILD.bazel index c4295bb94e374..af93cefec6fe8 100644 --- a/server/BUILD.bazel +++ b/server/BUILD.bazel @@ -28,15 +28,12 @@ go_library( visibility = ["//visibility:public"], deps = [ "//autoid_service", - "//br/pkg/lightning/mydump", - "//br/pkg/storage", "//config", "//ddl", "//domain", "//domain/infosync", "//errno", "//executor", - "//executor/importer", "//expression", "//extension", "//infoschema", diff --git a/server/conn.go b/server/conn.go index ddc7640dce5f3..b46b4aa75c462 100644 --- a/server/conn.go +++ b/server/conn.go @@ -56,13 +56,10 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/br/pkg/lightning/mydump" - "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/executor" - "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/extension" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -1583,12 +1580,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *execut return errors.New("load data info is empty") } infile := loadDataWorker.GetInfilePath() - compressTp := mydump.ParseCompressionOnFileExtension(infile) - compressTp2, err := mydump.ToStorageCompressType(compressTp) - if err != nil { - return err - } - err = cc.writeReq(ctx, infile) + err := cc.writeReq(ctx, infile) if err != nil { return err } @@ -1633,11 +1625,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *execut }() ctx = kv.WithInternalSourceType(ctx, kv.InternalLoadData) - _, err = loadDataWorker.Load(ctx, []importer.LoadDataReaderInfo{{ - Opener: func(_ context.Context) (io.ReadSeekCloser, error) { - addedSeekReader := executor.NewSimpleSeekerOnReadCloser(r) - return storage.InterceptDecompressReader(addedSeekReader, compressTp2) - }}}) + err = loadDataWorker.LoadLocal(ctx, r) _ = r.Close() wg.Wait() diff --git a/util/dbterror/exeerrors/errors.go b/util/dbterror/exeerrors/errors.go index 1c651f8cd99ba..40221aed99d6b 100644 --- a/util/dbterror/exeerrors/errors.go +++ b/util/dbterror/exeerrors/errors.go @@ -77,21 +77,21 @@ var ( ErrTruncateWrongInsertValue = dbterror.ClassTable.NewStdErr(mysql.ErrTruncatedWrongValue, parser_mysql.Message("Incorrect %-.32s value: '%-.128s' for column '%.192s' at row %d", nil)) ErrExistsInHistoryPassword = dbterror.ClassExecutor.NewStd(mysql.ErrExistsInHistoryPassword) - ErrWarnTooFewRecords = dbterror.ClassExecutor.NewStd(mysql.ErrWarnTooFewRecords) - ErrWarnTooManyRecords = dbterror.ClassExecutor.NewStd(mysql.ErrWarnTooManyRecords) - ErrLoadDataFromServerDisk = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataFromServerDisk) - ErrLoadParquetFromLocal = dbterror.ClassExecutor.NewStd(mysql.ErrLoadParquetFromLocal) - ErrLoadDataEmptyPath = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataEmptyPath) - ErrLoadDataUnsupportedFormat = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataUnsupportedFormat) - ErrLoadDataInvalidURI = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidURI) - ErrLoadDataCantAccess = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantAccess) - ErrLoadDataCantRead = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantRead) - ErrLoadDataWrongFormatConfig = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataWrongFormatConfig) - ErrUnknownOption = dbterror.ClassExecutor.NewStd(mysql.ErrUnknownOption) - ErrInvalidOptionVal = dbterror.ClassExecutor.NewStd(mysql.ErrInvalidOptionVal) - ErrDuplicateOption = dbterror.ClassExecutor.NewStd(mysql.ErrDuplicateOption) - ErrLoadDataUnsupportedOption = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataUnsupportedOption) - ErrLoadDataJobNotFound = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataJobNotFound) - ErrLoadDataInvalidOperation = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidOperation) - ErrLoadDataCantDetachWithLocal = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantDetachWithLocal) + ErrWarnTooFewRecords = dbterror.ClassExecutor.NewStd(mysql.ErrWarnTooFewRecords) + ErrWarnTooManyRecords = dbterror.ClassExecutor.NewStd(mysql.ErrWarnTooManyRecords) + ErrLoadDataFromServerDisk = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataFromServerDisk) + ErrLoadParquetFromLocal = dbterror.ClassExecutor.NewStd(mysql.ErrLoadParquetFromLocal) + ErrLoadDataEmptyPath = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataEmptyPath) + ErrLoadDataUnsupportedFormat = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataUnsupportedFormat) + ErrLoadDataInvalidURI = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidURI) + ErrLoadDataCantAccess = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantAccess) + ErrLoadDataCantRead = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataCantRead) + ErrLoadDataWrongFormatConfig = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataWrongFormatConfig) + ErrUnknownOption = dbterror.ClassExecutor.NewStd(mysql.ErrUnknownOption) + ErrInvalidOptionVal = dbterror.ClassExecutor.NewStd(mysql.ErrInvalidOptionVal) + ErrDuplicateOption = dbterror.ClassExecutor.NewStd(mysql.ErrDuplicateOption) + ErrLoadDataUnsupportedOption = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataUnsupportedOption) + ErrLoadDataJobNotFound = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataJobNotFound) + ErrLoadDataInvalidOperation = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataInvalidOperation) + ErrLoadDataLocalUnsupportedOption = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataLocalUnsupportedOption) )