Skip to content

Commit

Permalink
test: fix unstable TestFrameworkCancelGTask (#44923)
Browse files Browse the repository at this point in the history
close #44904
  • Loading branch information
tangenta authored Jun 26, 2023
1 parent 8d7d961 commit cae4314
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 22 deletions.
27 changes: 5 additions & 22 deletions disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,29 +159,12 @@ func DispatchTaskAndCheckSuccess(taskKey string, t *testing.T, v *atomic.Int64)
}

func DispatchAndCancelTask(taskKey string, t *testing.T, v *atomic.Int64) {
mgr, err := storage.GetTaskManager()
require.NoError(t, err)
taskID, err := mgr.AddNewGlobalTask(taskKey, proto.TaskTypeExample, 8, nil)
require.NoError(t, err)
start := time.Now()
mgr.CancelGlobalTask(taskID)
var task *proto.Task
for {
if time.Since(start) > 2*time.Minute {
require.FailNow(t, "timeout")
}

time.Sleep(time.Second)
task, err = mgr.GetGlobalTaskByID(taskID)
require.NoError(t, err)
require.NotNil(t, task)
if task.State != proto.TaskStatePending && task.State != proto.TaskStateRunning && task.State != proto.TaskStateCancelling && task.State != proto.TaskStateReverting {
break
}
}

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockExecutorRunCancel", "1*return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/scheduler/MockExecutorRunCancel"))
}()
task := DispatchTask(taskKey, t)
require.Equal(t, proto.TaskStateReverted, task.State)
require.Equal(t, int64(0), v.Load())
v.Store(0)
}

Expand Down
1 change: 1 addition & 0 deletions disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//disttask/framework/proto",
"//disttask/framework/storage",
"//resourcemanager/pool/spool",
"//resourcemanager/util",
"//util/logutil",
Expand Down
14 changes: 14 additions & 0 deletions disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -260,6 +261,19 @@ func (s *InternalSchedulerImpl) runMinimalTask(minimalTaskCtx context.Context, m
s.onError(errors.New("MockExecutorRunErr"))
}
})
failpoint.Inject("MockExecutorRunCancel", func(val failpoint.Value) {
if taskID, ok := val.(int); ok {
mgr, err := storage.GetTaskManager()
if err != nil {
logutil.BgLogger().Error("get task manager failed", zap.Error(err))
} else {
err = mgr.CancelGlobalTask(int64(taskID))
if err != nil {
logutil.BgLogger().Error("cancel global task failed", zap.Error(err))
}
}
}
})
if err = executor.Run(minimalTaskCtx); err != nil {
s.onError(err)
}
Expand Down

0 comments on commit cae4314

Please sign in to comment.