Skip to content

Commit

Permalink
importinto: make cancel wait task done and some fixes (#48928)
Browse files Browse the repository at this point in the history
close #48736
  • Loading branch information
D3Hunter authored Nov 29, 2023
1 parent f17ba4a commit 86df166
Show file tree
Hide file tree
Showing 33 changed files with 426 additions and 361 deletions.
16 changes: 3 additions & 13 deletions pkg/ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,9 @@ func skipMergeSort(stats []external.MultipleFilesStat) bool {
return external.GetMaxOverlappingTotal(stats) <= external.MergeSortOverlapThreshold
}

// OnErrStage generate error handling stage's plan.
func (*BackfillingDispatcherExt) OnErrStage(_ context.Context, _ dispatcher.TaskHandle, task *proto.Task, receiveErrs []error) (meta []byte, err error) {
// We do not need extra meta info when rolling back
logger := logutil.BgLogger().With(
zap.Stringer("type", task.Type),
zap.Int64("task-id", task.ID),
zap.String("step", StepStr(task.Step)),
)
logger.Info("on error stage", zap.Errors("errors", receiveErrs))
firstErr := receiveErrs[0]
task.Error = firstErr

return nil, nil
// OnDone implements dispatcher.Extension interface.
func (*BackfillingDispatcherExt) OnDone(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task) error {
return nil
}

// GetEligibleInstances implements dispatcher.Extension interface.
Expand Down
10 changes: 2 additions & 8 deletions pkg/ddl/backfilling_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/docker/go-units"
"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/ddl"
Expand Down Expand Up @@ -89,14 +88,9 @@ func TestBackfillingDispatcherLocalMode(t *testing.T) {
require.NoError(t, err)
require.Len(t, metas, 0)

// 1.3 test partition table OnErrStage.
errMeta, err := dsp.OnErrStage(context.Background(), nil, gTask, []error{errors.New("mockErr")})
// 1.3 test partition table OnDone.
err = dsp.OnDone(context.Background(), nil, gTask)
require.NoError(t, err)
require.Nil(t, errMeta)

errMeta, err = dsp.OnErrStage(context.Background(), nil, gTask, []error{errors.New("mockErr")})
require.NoError(t, err)
require.Nil(t, errMeta)

/// 2. test non partition table.
// 2.1 empty table
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2099,7 +2099,7 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
if err != nil {
return err
}
err = handle.WaitGlobalTask(ctx, task)
err = handle.WaitGlobalTask(ctx, task.ID)
if err := w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
logutil.BgLogger().Warn("job paused by user", zap.String("category", "ddl"), zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ go_test(
embed = [":dispatcher"],
flaky = True,
race = "off",
shard_count = 15,
shard_count = 16,
deps = [
"//pkg/disttask/framework/dispatcher/mock",
"//pkg/disttask/framework/mock",
Expand Down
43 changes: 25 additions & 18 deletions pkg/disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package dispatcher
import (
"context"
"math/rand"
"strings"
"time"

"github.com/pingcap/errors"
Expand All @@ -39,6 +40,10 @@ const (
MaxSubtaskConcurrency = 256
// DefaultLiveNodesCheckInterval is the tick interval of fetching all server infos from etcd.
DefaultLiveNodesCheckInterval = 2
// for a cancelled task, it's terminal state is reverted or reverted_failed,
// so we use a special error message to indicate that the task is cancelled
// by user.
taskCancelMsg = "cancelled by user"
)

var (
Expand Down Expand Up @@ -236,7 +241,7 @@ func (d *BaseDispatcher) scheduleTask() {
// handle task in cancelling state, dispatch revert subtasks.
func (d *BaseDispatcher) onCancelling() error {
logutil.Logger(d.logCtx).Info("on cancelling state", zap.Stringer("state", d.Task.State), zap.Int64("stage", int64(d.Task.Step)))
errs := []error{errors.New("cancel")}
errs := []error{errors.New(taskCancelMsg)}
return d.onErrHandlingStage(errs)
}

Expand Down Expand Up @@ -303,8 +308,9 @@ func (d *BaseDispatcher) onReverting() error {
return err
}
if cnt == 0 {
// Finish the rollback step.
logutil.Logger(d.logCtx).Info("all reverting tasks finished, update the task to reverted state")
if err = d.OnDone(d.ctx, d, d.Task); err != nil {
return errors.Trace(err)
}
return d.updateTask(proto.TaskStateReverted, nil, RetrySQLTimes)
}
// Wait all subtasks in this stage finished.
Expand Down Expand Up @@ -467,19 +473,9 @@ func (d *BaseDispatcher) updateTask(taskState proto.TaskState, newSubTasks []*pr
}

func (d *BaseDispatcher) onErrHandlingStage(receiveErrs []error) error {
// 1. generate the needed task meta and subTask meta (dist-plan).
meta, err := d.OnErrStage(d.ctx, d, d.Task, receiveErrs)
if err != nil {
// OnErrStage must be retryable, if not, there will have resource leak for tasks.
logutil.Logger(d.logCtx).Warn("handle error failed", zap.Error(err))
return err
}

// 2. dispatch revert dist-plan to EligibleInstances.
return d.dispatchSubTask4Revert(meta)
}
// we only store the first error.
d.Task.Error = receiveErrs[0]

func (d *BaseDispatcher) dispatchSubTask4Revert(meta []byte) error {
var subTasks []*proto.Subtask
// when step of task is `StepInit`, no need to do revert
if d.Task.Step != proto.StepInit {
Expand All @@ -492,7 +488,7 @@ func (d *BaseDispatcher) dispatchSubTask4Revert(meta []byte) error {
subTasks = make([]*proto.Subtask, 0, len(instanceIDs))
for _, id := range instanceIDs {
// reverting subtasks belong to the same step as current active step.
subTasks = append(subTasks, proto.NewSubtask(d.Task.Step, d.Task.ID, d.Task.Type, id, meta))
subTasks = append(subTasks, proto.NewSubtask(d.Task.Step, d.Task.ID, d.Task.Type, id, []byte("{}")))
}
}
return d.updateTask(proto.TaskStateReverting, subTasks, RetrySQLTimes)
Expand Down Expand Up @@ -544,7 +540,10 @@ func (d *BaseDispatcher) onNextStage() (err error) {
taskState := proto.TaskStateRunning
if d.Task.Step == proto.StepDone {
taskState = proto.TaskStateSucceed
logutil.Logger(d.logCtx).Info("all subtasks dispatched and processed, finish the task")
if err = d.OnDone(d.ctx, d, d.Task); err != nil {
err = errors.Trace(err)
return
}
} else {
logutil.Logger(d.logCtx).Info("move to next stage",
zap.Int64("from", int64(currStep)), zap.Int64("to", int64(d.Task.Step)))
Expand Down Expand Up @@ -632,7 +631,10 @@ func (d *BaseDispatcher) handlePlanErr(err error) error {
return err
}
d.Task.Error = err
// state transform: pending -> failed.

if err = d.OnDone(d.ctx, d, d.Task); err != nil {
return errors.Trace(err)
}
return d.updateTask(proto.TaskStateFailed, nil, RetrySQLTimes)
}

Expand Down Expand Up @@ -742,3 +744,8 @@ func (d *BaseDispatcher) WithNewSession(fn func(se sessionctx.Context) error) er
func (d *BaseDispatcher) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error {
return d.taskMgr.WithNewTxn(ctx, fn)
}

// IsCancelledErr checks if the error is a cancelled error.
func IsCancelledErr(err error) bool {
return strings.Contains(err.Error(), taskCancelMsg)
}
18 changes: 8 additions & 10 deletions pkg/disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,7 @@ func getTestDispatcherExt(ctrl *gomock.Controller) dispatcher.Extension {
},
).AnyTimes()

mockDispatcher.EXPECT().OnErrStage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ []error) (meta []byte, err error) {
return nil, nil
},
).AnyTimes()
mockDispatcher.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return mockDispatcher
}

Expand Down Expand Up @@ -110,11 +106,7 @@ func getNumberExampleDispatcherExt(ctrl *gomock.Controller) dispatcher.Extension
},
).AnyTimes()

mockDispatcher.EXPECT().OnErrStage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ []error) (meta []byte, err error) {
return nil, nil
},
).AnyTimes()
mockDispatcher.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return mockDispatcher
}

Expand Down Expand Up @@ -515,3 +507,9 @@ func TestVerifyTaskStateTransform(t *testing.T) {
require.Equal(t, tc.expect, dispatcher.VerifyTaskStateTransform(tc.oldState, tc.newState))
}
}

func TestIsCancelledErr(t *testing.T) {
require.False(t, dispatcher.IsCancelledErr(errors.New("some err")))
require.False(t, dispatcher.IsCancelledErr(context.Canceled))
require.True(t, dispatcher.IsCancelledErr(errors.New("cancelled by user")))
}
9 changes: 5 additions & 4 deletions pkg/disttask/framework/dispatcher/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ type Extension interface {
// when next step is StepDone, it should return nil, nil.
OnNextSubtasksBatch(ctx context.Context, h TaskHandle, task *proto.Task, serverInfo []*infosync.ServerInfo, step proto.Step) (subtaskMetas [][]byte, err error)

// OnErrStage is called when:
// 1. subtask is finished with error.
// 2. task is cancelled after we have dispatched some subtasks.
OnErrStage(ctx context.Context, h TaskHandle, task *proto.Task, receiveErrs []error) (subtaskMeta []byte, err error)
// OnDone is called when task is done, either finished successfully or failed
// with error.
// if the task is failed when initializing dispatcher, or it's an unknown task,
// we don't call this function.
OnDone(ctx context.Context, h TaskHandle, task *proto.Task) error

// GetEligibleInstances is used to get the eligible instances for the task.
// on certain condition we may want to use some instances to do the task, such as instances with more disk.
Expand Down
33 changes: 18 additions & 15 deletions pkg/disttask/framework/dispatcher/mock/dispatcher_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 1 addition & 5 deletions pkg/disttask/framework/framework_dynamic_dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ func getMockDynamicDispatchExt(ctrl *gomock.Controller) dispatcher.Extension {
},
).AnyTimes()

mockDispatcher.EXPECT().OnErrStage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ []error) (meta []byte, err error) {
return nil, nil
},
).AnyTimes()
mockDispatcher.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

return mockDispatcher
}
Expand Down
19 changes: 5 additions & 14 deletions pkg/disttask/framework/framework_err_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ func getPlanNotRetryableErrDispatcherExt(ctrl *gomock.Controller) dispatcher.Ext
},
).AnyTimes()

mockDispatcher.EXPECT().OnErrStage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ []error) (meta []byte, err error) {
return nil, errors.New("not retryable err")
},
).AnyTimes()
mockDispatcher.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return mockDispatcher
}

Expand Down Expand Up @@ -104,15 +100,10 @@ func getPlanErrDispatcherExt(ctrl *gomock.Controller) dispatcher.Extension {
},
).AnyTimes()

mockDispatcher.EXPECT().OnErrStage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ []error) (meta []byte, err error) {
if callTime == 1 {
callTime++
return nil, errors.New("not retryable err")
}
return []byte("planErrTask"), nil
},
).AnyTimes()
gomock.InOrder(
mockDispatcher.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("not retryable err")),
mockDispatcher.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes(),
)
return mockDispatcher
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/disttask/framework/framework_ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ func getMockHATestDispatcherExt(ctrl *gomock.Controller) dispatcher.Extension {
},
).AnyTimes()

mockDispatcher.EXPECT().OnErrStage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ []error) (meta []byte, err error) {
return nil, nil
},
).AnyTimes()
mockDispatcher.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

return mockDispatcher
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/disttask/framework/framework_rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ func getMockRollbackDispatcherExt(ctrl *gomock.Controller) dispatcher.Extension
},
).AnyTimes()

mockDispatcher.EXPECT().OnErrStage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ []error) (meta []byte, err error) {
return []byte("rollbacktask1"), nil
},
).AnyTimes()
mockDispatcher.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return mockDispatcher
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ func getMockBasicDispatcherExt(ctrl *gomock.Controller) dispatcher.Extension {
},
).AnyTimes()

mockDispatcher.EXPECT().OnErrStage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ []error) (meta []byte, err error) {
return nil, nil
},
).AnyTimes()
mockDispatcher.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return mockDispatcher
}

Expand Down
Loading

0 comments on commit 86df166

Please sign in to comment.