Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load data: physical mode part2 #42883

Merged
merged 9 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ const (
ErrLoadDataUnsupportedOption = 8166
ErrLoadDataJobNotFound = 8170
ErrLoadDataInvalidOperation = 8171
ErrLoadDataCantDetachWithLocal = 8172
ErrLoadDataLocalUnsupportedOption = 8172

// Error codes used by TiDB ddl package
ErrUnsupportedDDLOperation = 8200
Expand Down
2 changes: 1 addition & 1 deletion errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrLoadDataUnsupportedOption: mysql.Message("Unsupported option %s for %s import mode", nil),
ErrLoadDataJobNotFound: mysql.Message("Job ID %d doesn't exist", nil),
ErrLoadDataInvalidOperation: mysql.Message("The current job status cannot perform the operation. %s", nil),
ErrLoadDataCantDetachWithLocal: mysql.Message("The job can not be DETACHED when LOAD DATA LOCAL INFILE", nil),
ErrLoadDataLocalUnsupportedOption: mysql.Message("Unsupported option for LOAD DATA LOCAL INFILE: %s", nil),

ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil),
ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil),
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1753,7 +1753,7 @@ The current job status cannot perform the operation. %s

["executor:8172"]
error = '''
The job can not be DETACHED when LOAD DATA LOCAL INFILE
Unsupported option for LOAD DATA LOCAL INFILE: %s
'''

["executor:8212"]
Expand Down
3 changes: 3 additions & 0 deletions executor/asyncloaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ go_library(
"//types",
"//util/chunk",
"//util/dbterror/exeerrors",
"//util/logutil",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)

Expand Down
4 changes: 2 additions & 2 deletions executor/asyncloaddata/operate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (s *mockGCSSuite) TestOperateRunningJob() {
HeartBeatInSec = backup
})

s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCreateLoadDataJob", `sleep(1000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterStartJob", `sleep(1000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/asyncloaddata/AfterCreateLoadDataJob", `sleep(1000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/asyncloaddata/AfterStartJob", `sleep(1000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCommitOneTask", `sleep(1000)`)
sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-operate/t.tsv?endpoint=%s'
INTO TABLE test_operate.t WITH batch_size = 1;`, gcsEndpoint)
Expand Down
4 changes: 2 additions & 2 deletions executor/asyncloaddata/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type Progress struct {
// SourceFileSize is the size of the source file in bytes. When we can't get
// the size of the source file, it will be set to -1.
// Currently the value is read by seek(0, end), when LOAD DATA LOCAL we wrap
// Currently, the value is read by seek(0, end), when LOAD DATA LOCAL we wrap
// SimpleSeekerOnReadCloser on MySQL client connection which doesn't support
// it.
SourceFileSize int64
Expand Down Expand Up @@ -52,7 +52,7 @@ func (p *Progress) String() string {
return string(bs)
}

// ProgressFromJSON creates a Progress from a JSON string.
// ProgressFromJSON creates Progress from a JSON string.
func ProgressFromJSON(bs []byte) (*Progress, error) {
var p Progress
err := json.Unmarshal(bs, &p)
Expand Down
30 changes: 18 additions & 12 deletions executor/asyncloaddata/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *mockGCSSuite) TestInternalStatus() {
userStr := tk2.Session().GetSessionVars().User.String()

// wait for the load data job to be created
<-executor.TestSyncCh
<-TestSyncCh

jobInfos, err := GetAllJobInfo(ctx, tk2.Session(), userStr)
require.NoError(s.T(), err)
Expand Down Expand Up @@ -319,12 +319,18 @@ func (s *mockGCSSuite) TestInternalStatus() {
r.checkIgnoreTimes(s.T(), row)

// resume the load data job
executor.TestSyncCh <- struct{}{}
TestSyncCh <- struct{}{}

// wait for the load data job to be started
<-executor.TestSyncCh
<-TestSyncCh

info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
job := &Job{
ID: id,
Conn: tk2.Session(),
User: userStr,
}

info, err = job.GetJobInfo(ctx)
require.NoError(s.T(), err)
expected.StartTime = info.StartTime
expected.Status = JobRunning
Expand All @@ -337,20 +343,20 @@ func (s *mockGCSSuite) TestInternalStatus() {
r.checkIgnoreTimes(s.T(), row)

// resume the load data job
executor.TestSyncCh <- struct{}{}
TestSyncCh <- struct{}{}

// wait for the first task to be committed
<-executor.TestSyncCh

// wait for UpdateJobProgress
require.Eventually(s.T(), func() bool {
info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
info, err = job.GetJobInfo(ctx)
if err != nil {
return false
}
return info.Progress == `{"SourceFileSize":2,"LoadedFileSize":1,"LoadedRowCnt":1}`
}, 6*time.Second, time.Millisecond*100)
info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
info, err = job.GetJobInfo(ctx)
require.NoError(s.T(), err)
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":1,"LoadedRowCnt":1}`
require.Equal(s.T(), expected, info)
Expand All @@ -370,7 +376,7 @@ func (s *mockGCSSuite) TestInternalStatus() {

// wait for UpdateJobProgress
require.Eventually(s.T(), func() bool {
info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
info, err = job.GetJobInfo(ctx)
if err != nil {
return false
}
Expand All @@ -387,14 +393,14 @@ func (s *mockGCSSuite) TestInternalStatus() {
executor.TestSyncCh <- struct{}{}

require.Eventually(s.T(), func() bool {
info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
info, err = job.GetJobInfo(ctx)
if err != nil {
return false
}
return info.Status == JobFinished
}, 6*time.Second, 100*time.Millisecond)

info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
info, err = job.GetJobInfo(ctx)
require.NoError(s.T(), err)
expected.Status = JobFinished
expected.EndTime = info.EndTime
Expand Down Expand Up @@ -427,8 +433,8 @@ func (s *mockGCSSuite) TestInternalStatus() {
config.BufferSizeScale = backup3
})

s.enableFailpoint("github.com/pingcap/tidb/executor/SyncAfterCreateLoadDataJob", `return`)
s.enableFailpoint("github.com/pingcap/tidb/executor/SyncAfterStartJob", `return`)
s.enableFailpoint("github.com/pingcap/tidb/executor/asyncloaddata/SyncAfterCreateLoadDataJob", `return`)
s.enableFailpoint("github.com/pingcap/tidb/executor/asyncloaddata/SyncAfterStartJob", `return`)
s.enableFailpoint("github.com/pingcap/tidb/executor/SyncAfterCommitOneTask", `return`)
sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t*.tsv?endpoint=%s'
INTO TABLE load_tsv.t WITH batch_size = 1;`, gcsEndpoint)
Expand Down
Loading