Skip to content

Commit

Permalink
test: fix race on store.Close in dist test (#44691)
Browse files Browse the repository at this point in the history
close #44692
  • Loading branch information
ywqzzy authored Jun 15, 2023
1 parent 20dfa24 commit aa33a09
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
18 changes: 9 additions & 9 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (d *dispatcher) DispatchTaskLoop() {
continue
}
// the task is not in runningGTasks set when:
// owner changed or task is cancelled when status is pending
// owner changed or task is cancelled when status is pending.
if gTask.State == proto.TaskStateRunning || gTask.State == proto.TaskStateReverting || gTask.State == proto.TaskStateCancelling {
d.setRunningGTask(gTask)
cnt++
Expand Down Expand Up @@ -210,7 +210,7 @@ func (d *dispatcher) probeTask(gTask *proto.Task) (isFinished bool, subTaskErr [
if cancelling {
return false, [][]byte{[]byte("cancel")}
}
// check subtasks failed
// check subtasks failed.
cnt, err := d.taskMgr.GetSubtaskInStatesCnt(gTask.ID, proto.TaskStateFailed)
if err != nil {
logutil.BgLogger().Warn("check task failed", zap.Int64("task ID", gTask.ID), zap.Error(err))
Expand All @@ -224,7 +224,7 @@ func (d *dispatcher) probeTask(gTask *proto.Task) (isFinished bool, subTaskErr [
}
return false, subTaskErr
}
// check subtasks pending or running
// check subtasks pending or running.
cnt, err = d.taskMgr.GetSubtaskInStatesCnt(gTask.ID, proto.TaskStatePending, proto.TaskStateRunning)
if err != nil {
logutil.BgLogger().Warn("check task failed", zap.Int64("task ID", gTask.ID), zap.Error(err))
Expand All @@ -236,7 +236,7 @@ func (d *dispatcher) probeTask(gTask *proto.Task) (isFinished bool, subTaskErr [
return true, nil
}

// if gTask.State == TaskStateReverting, if will not convert to TaskStateCancelling again
// if gTask.State == TaskStateReverting, if will not convert to TaskStateCancelling again.
cnt, err := d.taskMgr.GetSubtaskInStatesCnt(gTask.ID, proto.TaskStateRevertPending, proto.TaskStateReverting)
if err != nil {
logutil.BgLogger().Warn("check task failed", zap.Int64("task ID", gTask.ID), zap.Error(err))
Expand Down Expand Up @@ -304,7 +304,7 @@ func (d *dispatcher) processFlow(gTask *proto.Task, errStr [][]byte) error {
logutil.BgLogger().Info("process flow, handle an error", zap.Int64("taskID", gTask.ID), zap.Any("err msg", errStr))
return d.processErrFlow(gTask, errStr)
}
// previous step is finished
// previous step is finished.
if gTask.State == proto.TaskStateReverting {
// Finish the rollback step.
logutil.BgLogger().Info("process flow, update the task to reverted", zap.Int64("taskID", gTask.ID))
Expand Down Expand Up @@ -340,14 +340,14 @@ func (d *dispatcher) updateTask(gTask *proto.Task, gTaskState string, newSubTask

func (d *dispatcher) processErrFlow(gTask *proto.Task, receiveErr [][]byte) error {
// TODO: Maybe it gets GetTaskFlowHandle fails when rolling upgrades.
// 1. generate the needed global task meta and subTask meta (dist-plan)
// 1. generate the needed global task meta and subTask meta (dist-plan).
meta, err := GetTaskFlowHandle(gTask.Type).ProcessErrFlow(d.ctx, d, gTask, receiveErr)
if err != nil {
logutil.BgLogger().Warn("handle error failed", zap.Error(err))
return err
}

// 2. dispatch revert dist-plan to EligibleInstances
// 2. dispatch revert dist-plan to EligibleInstances.
return d.dispatchSubTask4Revert(gTask, meta)
}

Expand All @@ -370,7 +370,7 @@ func (d *dispatcher) dispatchSubTask4Revert(gTask *proto.Task, meta []byte) erro
}

func (d *dispatcher) processNormalFlow(gTask *proto.Task) error {
// 1. generate the needed global task meta and subTask meta (dist-plan)
// 1. generate the needed global task meta and subTask meta (dist-plan).
handle := GetTaskFlowHandle(gTask.Type)
if handle == nil {
logutil.BgLogger().Warn("gen gTask flow handle failed, this type handle doesn't register", zap.Int64("ID", gTask.ID), zap.String("type", gTask.Type))
Expand All @@ -388,7 +388,7 @@ func (d *dispatcher) processNormalFlow(gTask *proto.Task) error {
logutil.BgLogger().Info("process normal flow", zap.Int64("task ID", gTask.ID),
zap.String("state", gTask.State), zap.Uint64("concurrency", gTask.Concurrency), zap.Int("subtasks", len(metas)))

// 2. dispatch dist-plan to EligibleInstances
// 2. dispatch dist-plan to EligibleInstances.
return d.dispatchSubTask(gTask, handle, metas)
}

Expand Down
25 changes: 19 additions & 6 deletions testkit/mockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/store/mockstore"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/gctuner"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -69,10 +70,11 @@ func CreateMockStore(t testing.TB, opts ...mockstore.MockTiKVStoreOption) kv.Sto
// DistExecutionTestContext is the context
// that used in Distributed execution test for Dist task framework and DDL.
type DistExecutionTestContext struct {
Store kv.Storage
domains []*domain.Domain
t testing.TB
mu sync.Mutex
Store kv.Storage
domains []*domain.Domain
deletedDomains []*domain.Domain
t testing.TB
mu sync.Mutex
}

// InitOwner select the last domain as DDL owner.
Expand Down Expand Up @@ -128,7 +130,10 @@ func (d *DistExecutionTestContext) DeleteServer(idx int) {
d.SetOwner(0)
d.mu.Lock()
}

d.deletedDomains = append(d.deletedDomains, d.domains[idx])
d.domains = append(d.domains[:idx], d.domains[idx+1:]...)

err := infosync.MockGlobalServerInfoManagerEntry.Delete(idx)
require.NoError(d.t, err)
}
Expand All @@ -139,9 +144,17 @@ func (d *DistExecutionTestContext) Close() {
d.mu.Lock()
defer d.mu.Unlock()
gctuner.GlobalMemoryLimitTuner.Stop()

var wg tidbutil.WaitGroupWrapper
for _, dom := range d.deletedDomains {
wg.Run(dom.Close)
}

for _, dom := range d.domains {
dom.Close()
wg.Run(dom.Close)
}

wg.Wait()
err := d.Store.Close()
require.NoError(d.t, err)
})
Expand All @@ -166,7 +179,7 @@ func NewDistExecutionTestContext(t testing.TB, serverNum int) *DistExecutionTest
}

res := DistExecutionTestContext{
schematracker.UnwrapStorage(store), domains, t, sync.Mutex{}}
schematracker.UnwrapStorage(store), domains, []*domain.Domain{}, t, sync.Mutex{}}
res.InitOwner()
return &res
}
Expand Down

0 comments on commit aa33a09

Please sign in to comment.