Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix a bug that update statement uses point get and update plan with different tblInfo (#54183) #54259

Merged
114 changes: 114 additions & 0 deletions ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"context"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -142,6 +143,119 @@

// ****************************** End of Default DDL Callback Instance *********************************************

// ****************************** Start of Test DDL Callback Instance ***********************************************
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a code move in the file, for compatibility with the current PR test call issue.


// TestDDLCallback is used to customize user callback themselves.
type TestDDLCallback struct {
*BaseCallback
// We recommended to pass the domain parameter to the test ddl callback, it will ensure
// domain to reload schema before your ddl stepping into the next state change.
Do DomainReloader

onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
OnJobSchemaStateChanged func(int64)
}

// OnChanged mock the same behavior with the main DDL hook.
func (tc *TestDDLCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")
if tc.Do != nil {
err = tc.Do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}

Check warning on line 175 in ddl/callback.go

View check run for this annotation

Codecov / codecov/patch

ddl/callback.go#L174-L175

Added lines #L174 - L175 were not covered by tests
}
return nil
}

// OnSchemaStateChanged mock the same behavior with the main ddl hook.
func (tc *TestDDLCallback) OnSchemaStateChanged(schemaVer int64) {
if tc.Do != nil {
if err := tc.Do.Reload(); err != nil {
logutil.BgLogger().Warn("reload failed on schema state changed", zap.Error(err))
}

Check warning on line 185 in ddl/callback.go

View check run for this annotation

Codecov / codecov/patch

ddl/callback.go#L184-L185

Added lines #L184 - L185 were not covered by tests
}

if tc.OnJobSchemaStateChanged != nil {
tc.OnJobSchemaStateChanged(schemaVer)
return
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
logutil.BgLogger().Info("on job run before", zap.String("job", job.String()))
if tc.OnJobRunBeforeExported != nil {
tc.OnJobRunBeforeExported(job)
return
}
if tc.onJobRunBefore != nil {
tc.onJobRunBefore(job)
return
}

Check warning on line 204 in ddl/callback.go

View check run for this annotation

Codecov / codecov/patch

ddl/callback.go#L202-L204

Added lines #L202 - L204 were not covered by tests

tc.BaseCallback.OnJobRunBefore(job)
}

// OnJobUpdated is used to run the user customized logic of `OnJobUpdated` first.
func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
logutil.BgLogger().Info("on job updated", zap.String("job", job.String()))
if onJobUpdatedExportedFunc := tc.OnJobUpdatedExported.Load(); onJobUpdatedExportedFunc != nil {
(*onJobUpdatedExportedFunc)(job)
return
}
if tc.onJobUpdated != nil {
tc.onJobUpdated(job)
return
}

Check warning on line 219 in ddl/callback.go

View check run for this annotation

Codecov / codecov/patch

ddl/callback.go#L217-L219

Added lines #L217 - L219 were not covered by tests

tc.BaseCallback.OnJobUpdated(job)
}

// OnWatched is used to run the user customized logic of `OnWatched` first.
func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
if tc.onWatched != nil {
tc.onWatched(ctx)
return
}

Check warning on line 229 in ddl/callback.go

View check run for this annotation

Codecov / codecov/patch

ddl/callback.go#L225-L229

Added lines #L225 - L229 were not covered by tests

tc.BaseCallback.OnWatched(ctx)

Check warning on line 231 in ddl/callback.go

View check run for this annotation

Codecov / codecov/patch

ddl/callback.go#L231

Added line #L231 was not covered by tests
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (tc *TestDDLCallback) OnGetJobBefore(jobType string) {
if tc.OnGetJobBeforeExported != nil {
tc.OnGetJobBeforeExported(jobType)
return
}

Check warning on line 239 in ddl/callback.go

View check run for this annotation

Codecov / codecov/patch

ddl/callback.go#L237-L239

Added lines #L237 - L239 were not covered by tests
tc.BaseCallback.OnGetJobBefore(jobType)
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (tc *TestDDLCallback) OnGetJobAfter(jobType string, job *model.Job) {
if tc.OnGetJobAfterExported != nil {
tc.OnGetJobAfterExported(jobType, job)
return
}

Check warning on line 248 in ddl/callback.go

View check run for this annotation

Codecov / codecov/patch

ddl/callback.go#L246-L248

Added lines #L246 - L248 were not covered by tests
tc.BaseCallback.OnGetJobAfter(jobType, job)
}

// Clone copies the callback and take its reference
func (tc *TestDDLCallback) Clone() *TestDDLCallback {
return &*tc
}

// ****************************** End of Test DDL Callback Instance ***********************************************

// ****************************** Start of CTC DDL Callback Instance ***********************************************

// ctcCallback is the customized callback that TiDB will use.
Expand Down
113 changes: 0 additions & 113 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@ package ddl

import (
"context"
"sync/atomic"
"testing"

"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

type TestInterceptor struct {
Expand All @@ -41,115 +37,6 @@ func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema
return ti.BaseInterceptor.OnGetInfoSchema(ctx, is)
}

// TestDDLCallback is used to customize user callback themselves.
type TestDDLCallback struct {
*BaseCallback
// We recommended to pass the domain parameter to the test ddl callback, it will ensure
// domain to reload schema before your ddl stepping into the next state change.
Do DomainReloader

onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
OnJobSchemaStateChanged func(int64)
}

// OnChanged mock the same behavior with the main DDL hook.
func (tc *TestDDLCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")
if tc.Do != nil {
err = tc.Do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}
}
return nil
}

// OnSchemaStateChanged mock the same behavior with the main ddl hook.
func (tc *TestDDLCallback) OnSchemaStateChanged(schemaVer int64) {
if tc.Do != nil {
if err := tc.Do.Reload(); err != nil {
logutil.BgLogger().Warn("reload failed on schema state changed", zap.Error(err))
}
}

if tc.OnJobSchemaStateChanged != nil {
tc.OnJobSchemaStateChanged(schemaVer)
return
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
logutil.BgLogger().Info("on job run before", zap.String("job", job.String()))
if tc.OnJobRunBeforeExported != nil {
tc.OnJobRunBeforeExported(job)
return
}
if tc.onJobRunBefore != nil {
tc.onJobRunBefore(job)
return
}

tc.BaseCallback.OnJobRunBefore(job)
}

// OnJobUpdated is used to run the user customized logic of `OnJobUpdated` first.
func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
logutil.BgLogger().Info("on job updated", zap.String("job", job.String()))
if onJobUpdatedExportedFunc := tc.OnJobUpdatedExported.Load(); onJobUpdatedExportedFunc != nil {
(*onJobUpdatedExportedFunc)(job)
return
}
if tc.onJobUpdated != nil {
tc.onJobUpdated(job)
return
}

tc.BaseCallback.OnJobUpdated(job)
}

// OnWatched is used to run the user customized logic of `OnWatched` first.
func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
if tc.onWatched != nil {
tc.onWatched(ctx)
return
}

tc.BaseCallback.OnWatched(ctx)
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (tc *TestDDLCallback) OnGetJobBefore(jobType string) {
if tc.OnGetJobBeforeExported != nil {
tc.OnGetJobBeforeExported(jobType)
return
}
tc.BaseCallback.OnGetJobBefore(jobType)
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (tc *TestDDLCallback) OnGetJobAfter(jobType string, job *model.Job) {
if tc.OnGetJobAfterExported != nil {
tc.OnGetJobAfterExported(jobType, job)
return
}
tc.BaseCallback.OnGetJobAfter(jobType, job)
}

// Clone copies the callback and take its reference
func (tc *TestDDLCallback) Clone() *TestDDLCallback {
return &*tc
}

func TestCallback(t *testing.T) {
cb := &BaseCallback{}
require.Nil(t, cb.OnChanged(nil))
Expand Down
9 changes: 7 additions & 2 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isGeneral
// step 3: add metadata lock and check each table's schema version
schemaNotMatch := false
for i := 0; i < len(stmt.dbName); i++ {
_, ok := is.TableByID(stmt.tbls[i].Meta().ID)
tbl, ok := is.TableByID(stmt.tbls[i].Meta().ID)
if !ok {
tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name)
if err != nil {
Expand All @@ -94,7 +94,12 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isGeneral
schemaNotMatch = true
continue
}
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
// The revision of tbl and newTbl may not be the same.
// Example:
// The version of stmt.tbls[i] is taken from the prepare statement and is revision v1.
// When stmt.tbls[i] is locked in MDL, the revision of newTbl is also v1.
// The revision of tbl is v2. The reason may have other statements trigger "tryLockMDLAndUpdateSchemaIfNecessary" before, leading to tbl revision update.
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || (tbl != nil && tbl.Meta().Revision != newTbl.Meta().Revision) {
schemaNotMatch = true
}
stmt.tbls[i] = newTbl
Expand Down
Loading
Loading