Skip to content

Commit

Permalink
Merge branch 'master' into example_mend
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jun 11, 2022
2 parents 9212645 + 06737ec commit a0e222d
Show file tree
Hide file tree
Showing 36 changed files with 9,079 additions and 9,444 deletions.
34 changes: 32 additions & 2 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,49 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
atomicutil "go.uber.org/atomic"
)

type testCancelJob struct {
sql string
ok bool
cancelState model.SchemaState
cancelState interface{} // model.SchemaState | []model.SchemaState
onJobBefore bool
onJobUpdate bool
prepareSQL []string
}

type subStates = []model.SchemaState

func testMatchCancelState(t *testing.T, job *model.Job, cancelState interface{}, sql string) bool {
switch v := cancelState.(type) {
case model.SchemaState:
if job.Type == model.ActionMultiSchemaChange {
msg := fmt.Sprintf("unexpected multi-schema change(sql: %s, cancel state: %s)", sql, v)
assert.Failf(t, msg, "use []model.SchemaState as cancel states instead")
return false
}
return job.SchemaState == v
case subStates: // For multi-schema change sub-jobs.
if job.MultiSchemaInfo == nil {
msg := fmt.Sprintf("not multi-schema change(sql: %s, cancel state: %v)", sql, v)
assert.Failf(t, msg, "use model.SchemaState as the cancel state instead")
return false
}
assert.Equal(t, len(job.MultiSchemaInfo.SubJobs), len(v), sql)
for i, subJobSchemaState := range v {
if job.MultiSchemaInfo.SubJobs[i].SchemaState != subJobSchemaState {
return false
}
}
return true
default:
return false
}
}

var allTestCase = []testCancelJob{
// Add index.
{"create unique index c3_index on t_partition (c1)", true, model.StateWriteReorganization, true, true, nil},
Expand Down Expand Up @@ -246,7 +276,7 @@ func TestCancel(t *testing.T) {
cancelWhenReorgNotStart := false

hookFunc := func(job *model.Job) {
if job.SchemaState == allTestCase[i.Load()].cancelState && !cancel {
if testMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !cancel {
if !cancelWhenReorgNotStart && job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 {
return
}
Expand Down
11 changes: 7 additions & 4 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/testkit/external"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -63,7 +64,6 @@ func TestColumnAdd(t *testing.T) {
var jobID int64
tc.OnJobUpdatedExported = func(job *model.Job) {
jobID = job.ID
require.NoError(t, dom.Reload())
tbl, exist := dom.InfoSchema().TableByID(job.TableID)
require.True(t, exist)
switch job.SchemaState {
Expand Down Expand Up @@ -96,11 +96,14 @@ func TestColumnAdd(t *testing.T) {
}
}
tc.OnJobUpdatedExported = func(job *model.Job) {
if job.NotStarted() {
return
}
jobID = job.ID
tbl := external.GetTableByName(t, internal, "test", "t")
if job.SchemaState != model.StatePublic {
for _, col := range tbl.Cols() {
require.NotEqualf(t, col.ID, dropCol.ID, "column is not dropped")
assert.NotEqualf(t, col.ID, dropCol.ID, "column is not dropped")
}
}
}
Expand Down Expand Up @@ -224,7 +227,7 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t
return errors.Trace(err)
}
err = checkResult(ctx, writeOnlyTable, writeOnlyTable.WritableCols(), [][]string{
{"1", "2", "<nil>"},
{"1", "2", "3"},
{"2", "3", "3"},
})
if err != nil {
Expand All @@ -236,7 +239,7 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t
return errors.Trace(err)
}
got := fmt.Sprintf("%v", row)
expect := fmt.Sprintf("%v", []types.Datum{types.NewDatum(1), types.NewDatum(2), types.NewDatum(nil)})
expect := fmt.Sprintf("%v", []types.Datum{types.NewDatum(1), types.NewDatum(2), types.NewDatum(3)})
if got != expect {
return errors.Errorf("expect %v, got %v", expect, got)
}
Expand Down
1 change: 1 addition & 0 deletions ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestMain(m *testing.M) {

autoid.SetStep(5000)
ddl.ReorgWaitTimeout = 30 * time.Millisecond
ddl.RunInGoTest = true
ddl.SetBatchInsertDeleteRangeSize(2)

config.UpdateGlobal(func(conf *config.Config) {
Expand Down
13 changes: 0 additions & 13 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
hook.(func(*kv.Request))(kvReq)
}

kvReq.Streaming = false
enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction
originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL
eventCb := func(event trxevents.TransactionEvent) {
Expand Down Expand Up @@ -111,20 +110,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
}

// kvReq.MemTracker is used to trace and control memory usage in DistSQL layer;
// for streamResult, since it is a pipeline which has no buffer, it's not necessary to trace it;
// for selectResult, we just use the kvReq.MemTracker prepared for co-processor
// instead of creating a new one for simplification.
if kvReq.Streaming {
return &streamResult{
label: "dag-stream",
sqlType: label,
resp: resp,
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
ctx: sctx,
feedback: fb,
}, nil
}
return &selectResult{
label: "dag",
resp: resp,
Expand Down
6 changes: 0 additions & 6 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,6 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
return builder
}

// SetStreaming sets "Streaming" flag for "kv.Request".
func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder {
builder.Request.Streaming = streaming
return builder
}

// SetPaging sets "Paging" flag for "kv.Request".
func (builder *RequestBuilder) SetPaging(paging bool) *RequestBuilder {
builder.Request.Paging = paging
Expand Down
8 changes: 0 additions & 8 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ func TestRequestBuilder1(t *testing.T) {
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
Expand Down Expand Up @@ -352,7 +351,6 @@ func TestRequestBuilder2(t *testing.T) {
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
Expand Down Expand Up @@ -400,7 +398,6 @@ func TestRequestBuilder3(t *testing.T) {
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
Expand Down Expand Up @@ -432,7 +429,6 @@ func TestRequestBuilder4(t *testing.T) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetStreaming(true).
SetFromSessionVars(variable.NewSessionVars()).
Build()
require.NoError(t, err)
Expand All @@ -447,7 +443,6 @@ func TestRequestBuilder4(t *testing.T) {
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
Streaming: true,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
Expand Down Expand Up @@ -493,7 +488,6 @@ func TestRequestBuilder5(t *testing.T) {
IsolationLevel: kv.RC,
Priority: 1,
NotFillCache: true,
Streaming: false,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
Expand Down Expand Up @@ -523,7 +517,6 @@ func TestRequestBuilder6(t *testing.T) {
IsolationLevel: 0,
Priority: 0,
NotFillCache: true,
Streaming: false,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
Expand Down Expand Up @@ -559,7 +552,6 @@ func TestRequestBuilder7(t *testing.T) {
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
Streaming: false,
ReplicaRead: replicaRead.replicaReadType,
ReadReplicaScope: kv.GlobalReplicaScope,
}
Expand Down
1 change: 0 additions & 1 deletion distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ var (

var (
_ SelectResult = (*selectResult)(nil)
_ SelectResult = (*streamResult)(nil)
_ SelectResult = (*serialSelectResults)(nil)
)

Expand Down
Loading

0 comments on commit a0e222d

Please sign in to comment.