Skip to content

Commit

Permalink
opt notify job, ut
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed May 13, 2024
1 parent 1de5d27 commit 0efbfd1
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 13 deletions.
52 changes: 41 additions & 11 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/syncutil"
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -366,15 +367,17 @@ type ddlCtx struct {
ownerManager owner.Manager
schemaSyncer syncer.SchemaSyncer
stateSyncer syncer.StateSyncer
ddlJobDoneCh chan struct{}
ddlEventCh chan<- *statsutil.DDLEvent
lease time.Duration // lease is schema lease, default 45s, see config.Lease.
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
infoCache *infoschema.InfoCache
statsHandle *handle.Handle
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover
// ddlJobDoneChMap is used to notify the session that the DDL job is finished.
// jobID -> chan struct{}
ddlJobDoneChMap generic.SyncMap[int64, chan struct{}]
ddlEventCh chan<- *statsutil.DDLEvent
lease time.Duration // lease is schema lease, default 45s, see config.Lease.
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
infoCache *infoschema.InfoCache
statsHandle *handle.Handle
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover

*waitSchemaSyncedController
*schemaVersionManager
Expand Down Expand Up @@ -618,6 +621,27 @@ func (dc *ddlCtx) notifyReorgWorkerJobStateChange(job *model.Job) {
rc.notifyJobState(job.State)
}

func (dc *ddlCtx) initJobDoneCh(jobID int64) {
dc.ddlJobDoneChMap.Store(jobID, make(chan struct{}, 1))
}

func (dc *ddlCtx) getJobDoneCh(jobID int64) (chan struct{}, bool) {
return dc.ddlJobDoneChMap.Load(jobID)
}

func (dc *ddlCtx) delJobDoneCh(jobID int64) {
dc.ddlJobDoneChMap.Delete(jobID)
}

func (dc *ddlCtx) notifyJobDone(jobID int64) {
if ch, ok := dc.ddlJobDoneChMap.Load(jobID); ok {
select {
case ch <- struct{}{}:
default:
}
}
}

// EnableTiFlashPoll enables TiFlash poll loop aka PollTiFlashReplicaStatus.
func EnableTiFlashPoll(d any) {
if dd, ok := d.(*ddl); ok {
Expand Down Expand Up @@ -711,7 +735,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
uuid: id,
store: opt.Store,
lease: opt.Lease,
ddlJobDoneCh: make(chan struct{}, 1),
ddlJobDoneChMap: generic.NewSyncMap[int64, chan struct{}](10),
ownerManager: manager,
schemaSyncer: schemaSyncer,
stateSyncer: stateSyncer,
Expand Down Expand Up @@ -1177,6 +1201,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {

// worker should restart to continue handling tasks in limitJobCh, and send back through task.err
err := <-task.errChs[0]
defer d.delJobDoneCh(job.ID)
if err != nil {
// The transaction of enqueuing job is failed.
return errors.Trace(err)
Expand Down Expand Up @@ -1214,13 +1239,18 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
recordLastDDLInfo(ctx, historyJob)
}()
i := 0
notifyCh, ok := d.getJobDoneCh(job.ID)
if !ok {
// shouldn't happen, just give it a dummy one
notifyCh = make(chan struct{})
}
for {
failpoint.Inject("storeCloseInLoop", func(_ failpoint.Value) {
_ = d.Stop()
})

select {
case <-d.ddlJobDoneCh:
case <-notifyCh:
case <-ticker.C:
i++
ticker = updateTickerInterval(ticker, 10*d.lease, job, i)
Expand Down
6 changes: 5 additions & 1 deletion pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {

jobTasks = append(jobTasks, job)
injectModifyJobArgFailPoint(job)
// only need it for non-local mode.
if !tasks[0].job.LocalMode {
d.initJobDoneCh(job.ID)
}
}

se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
Expand Down Expand Up @@ -860,7 +864,7 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
return err
}
CleanupDDLReorgHandles(job, w.sess)
asyncNotify(d.ddlJobDoneCh)
d.notifyJobDone(job.ID)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func decodeJobVersionEvent(kv *mvccpb.KeyValue, tp mvccpb.Event_EventType, prefi
if err != nil {
return 0, "", 0, false
}
// there is Value in DELETE event, so we need to check it.
// there is no Value in DELETE event, so we need to check it.
if tp == mvccpb.PUT {
schemaVer, err = strconv.ParseInt(string(kv.Value), 10, 64)
if err != nil {
Expand Down
47 changes: 47 additions & 0 deletions pkg/ddl/syncer/syncer_nokit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package syncer

import (
"testing"

"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/mvccpb"
)

func TestDecodeJobVersionEvent(t *testing.T) {
prefix := util.DDLAllSchemaVersionsByJob + "/"
_, _, _, valid := decodeJobVersionEvent(&mvccpb.KeyValue{Key: []byte(prefix + "1")}, mvccpb.PUT, prefix)
require.False(t, valid)
_, _, _, valid = decodeJobVersionEvent(&mvccpb.KeyValue{Key: []byte(prefix + "a/aa")}, mvccpb.PUT, prefix)
require.False(t, valid)
_, _, _, valid = decodeJobVersionEvent(&mvccpb.KeyValue{
Key: []byte(prefix + "1/aa"), Value: []byte("aa")}, mvccpb.PUT, prefix)
require.False(t, valid)
jobID, tidbID, schemaVer, valid := decodeJobVersionEvent(&mvccpb.KeyValue{
Key: []byte(prefix + "1/aa"), Value: []byte("123")}, mvccpb.PUT, prefix)
require.True(t, valid)
require.EqualValues(t, 1, jobID)
require.EqualValues(t, "aa", tidbID)
require.EqualValues(t, 123, schemaVer)
// value is not used on delete
jobID, tidbID, schemaVer, valid = decodeJobVersionEvent(&mvccpb.KeyValue{
Key: []byte(prefix + "1/aa"), Value: []byte("aaaa")}, mvccpb.DELETE, prefix)
require.True(t, valid)
require.EqualValues(t, 1, jobID)
require.EqualValues(t, "aa", tidbID)
require.EqualValues(t, 0, schemaVer)
}

0 comments on commit 0efbfd1

Please sign in to comment.