Skip to content

Commit

Permalink
domain: stop schema validator to prohibit DML executing when tidb los…
Browse files Browse the repository at this point in the history
…t session of etcd (#8441) (#8494)
  • Loading branch information
winkyao authored and zz-jason committed Nov 28, 2018
1 parent ced80e5 commit baaca37
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 17 deletions.
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
var syncer SchemaSyncer
if etcdCli == nil {
// The etcdCli is nil if the store is localstore which is only used for testing.
// So we use mockOwnerManager and mockSchemaSyncer.
// So we use mockOwnerManager and MockSchemaSyncer.
manager = owner.NewMockManager(id, cancelFunc)
syncer = NewMockSchemaSyncer()
} else {
Expand Down
42 changes: 42 additions & 0 deletions ddl/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (

gofail "github.com/etcd-io/gofail/runtime"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -150,3 +152,43 @@ func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) {
_, err = s.se.Execute(context.Background(), "drop database cancel_job_db")
c.Assert(err, IsNil)
}

// TestFailSchemaSyncer test when the schema syncer is done,
// should prohibit DML executing until the syncer is restartd by loadSchemaInLoop.
func (s *testDBSuite) TestFailSchemaSyncer(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
defer tk.MustExec("drop table if exists t")
c.Assert(s.dom.SchemaValidator.IsStarted(), IsTrue)
mockSyncer, ok := s.dom.DDL().SchemaSyncer().(*ddl.MockSchemaSyncer)
c.Assert(ok, IsTrue)

// make reload failed.
s.dom.MockReloadFailed.SetValue(true)
mockSyncer.CloseSession()
// wait the schemaValidator is stopped.
for i := 0; i < 50; i++ {
if s.dom.SchemaValidator.IsStarted() == false {
break
}
time.Sleep(20 * time.Millisecond)
}

c.Assert(s.dom.SchemaValidator.IsStarted(), IsFalse)
_, err := tk.Exec("insert into t values(1)")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[domain:1]Information schema is out of date.")
s.dom.MockReloadFailed.SetValue(false)
// wait the schemaValidator is started.
for i := 0; i < 50; i++ {
if s.dom.SchemaValidator.IsStarted() == true {
break
}
time.Sleep(100 * time.Millisecond)
}
c.Assert(s.dom.SchemaValidator.IsStarted(), IsTrue)
_, err = tk.Exec("insert into t values(1)")
c.Assert(err, IsNil)
}
39 changes: 25 additions & 14 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,53 +25,64 @@ import (
"golang.org/x/net/context"
)

var _ SchemaSyncer = &mockSchemaSyncer{}
var _ SchemaSyncer = &MockSchemaSyncer{}

const mockCheckVersInterval = 2 * time.Millisecond

type mockSchemaSyncer struct {
// MockSchemaSyncer is a mock schema syncer, it is exported for tesing.
type MockSchemaSyncer struct {
selfSchemaVersion int64
globalVerCh chan clientv3.WatchResponse
mockSession chan struct{}
}

// NewMockSchemaSyncer creates a new mock SchemaSyncer.
func NewMockSchemaSyncer() SchemaSyncer {
return &mockSchemaSyncer{}
return &MockSchemaSyncer{}
}

// Init implements SchemaSyncer.Init interface.
func (s *mockSchemaSyncer) Init(ctx context.Context) error {
func (s *MockSchemaSyncer) Init(ctx context.Context) error {
s.globalVerCh = make(chan clientv3.WatchResponse, 1)
s.mockSession = make(chan struct{}, 1)
return nil
}

// GlobalVersionCh implements SchemaSyncer.GlobalVersionCh interface.
func (s *mockSchemaSyncer) GlobalVersionCh() clientv3.WatchChan {
func (s *MockSchemaSyncer) GlobalVersionCh() clientv3.WatchChan {
return s.globalVerCh
}

// WatchGlobalSchemaVer implements SchemaSyncer.WatchGlobalSchemaVer interface.
func (s *mockSchemaSyncer) WatchGlobalSchemaVer(context.Context) {}
func (s *MockSchemaSyncer) WatchGlobalSchemaVer(context.Context) {}

// UpdateSelfVersion implements SchemaSyncer.UpdateSelfVersion interface.
func (s *mockSchemaSyncer) UpdateSelfVersion(ctx context.Context, version int64) error {
func (s *MockSchemaSyncer) UpdateSelfVersion(ctx context.Context, version int64) error {
atomic.StoreInt64(&s.selfSchemaVersion, version)
return nil
}

// Done implements SchemaSyncer.Done interface.
func (s *mockSchemaSyncer) Done() <-chan struct{} {
return make(chan struct{}, 1)
func (s *MockSchemaSyncer) Done() <-chan struct{} {
return s.mockSession
}

// CloseSession mockSession, it is exported for testing.
func (s *MockSchemaSyncer) CloseSession() {
close(s.mockSession)
}

// Restart implements SchemaSyncer.Restart interface.
func (s *mockSchemaSyncer) Restart(_ context.Context) error { return nil }
func (s *MockSchemaSyncer) Restart(_ context.Context) error {
s.mockSession = make(chan struct{}, 1)
return nil
}

// RemoveSelfVersionPath implements SchemaSyncer.RemoveSelfVersionPath interface.
func (s *mockSchemaSyncer) RemoveSelfVersionPath() error { return nil }
func (s *MockSchemaSyncer) RemoveSelfVersionPath() error { return nil }

// OwnerUpdateGlobalVersion implements SchemaSyncer.OwnerUpdateGlobalVersion interface.
func (s *mockSchemaSyncer) OwnerUpdateGlobalVersion(ctx context.Context, version int64) error {
func (s *MockSchemaSyncer) OwnerUpdateGlobalVersion(ctx context.Context, version int64) error {
select {
case s.globalVerCh <- clientv3.WatchResponse{}:
default:
Expand All @@ -80,12 +91,12 @@ func (s *mockSchemaSyncer) OwnerUpdateGlobalVersion(ctx context.Context, version
}

// MustGetGlobalVersion implements SchemaSyncer.MustGetGlobalVersion interface.
func (s *mockSchemaSyncer) MustGetGlobalVersion(ctx context.Context) (int64, error) {
func (s *MockSchemaSyncer) MustGetGlobalVersion(ctx context.Context) (int64, error) {
return 0, nil
}

// OwnerCheckAllVersions implements SchemaSyncer.OwnerCheckAllVersions interface.
func (s *mockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer int64) error {
func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer int64) error {
ticker := time.NewTicker(mockCheckVersInterval)
defer ticker.Stop()

Expand Down
38 changes: 38 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,26 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
case <-syncer.Done():
// The schema syncer stops, we need stop the schema validator to synchronize the schema version.
log.Info("[ddl] reload schema in loop, schema syncer need restart")
// The etcd is responsible for schema synchronization, we should ensure there is at most two diffrent schema version
// in the TiDB cluster, to make the data/schema be consistent. If we lost connection/session to etcd, the cluster
// will treats this TiDB as a down instance, and etcd will remove the key of `/tidb/ddl/all_schema_versions/tidb-id`.
// Say the schema version now is 1, the owner is changing the schema version to 2, it will not wait for this down TiDB syncing the schema,
// then continue to change the TiDB schema to version 3. Unfortunately, this down TiDB schema version will still be version 1.
// And version 1 is not consistent to version 3. So we need to stop the schema validator to prohibit the DML executing.
do.SchemaValidator.Stop()
err := do.mustRestartSyncer()
if err != nil {
log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err))
break
}
// The schema maybe changed, must reload schema then the schema validator can restart.
exitLoop := do.mustReload()
if exitLoop {
// domain is closed.
log.Errorf("[ddl] domain is closed. exit loadSchemaInLoop")
return
}
do.SchemaValidator.Restart()
log.Info("[ddl] schema syncer restarted.")
case <-do.exit:
return
Expand Down Expand Up @@ -388,6 +403,29 @@ func (do *Domain) mustRestartSyncer() error {
}
}

// mustReload tries to Reload the schema, it returns until it's successful or the domain is closed.
// it returns false when it is sucessful, returns true when the domain is closed.
func (do *Domain) mustReload() (exitLoop bool) {
for {
err := do.Reload()
if err == nil {
log.Infof("[ddl] mustReload succeed.")
return false
}

log.Infof("[ddl] reload the schema failed: %v", err)
// If the domain is closed, we returns immediately.
select {
case <-do.exit:
log.Infof("[ddl] domain is closed.")
return true
default:
}

time.Sleep(200 * time.Millisecond)
}
}

// Close closes the Domain and release its resource.
func (do *Domain) Close() {
if do.ddl != nil {
Expand Down
11 changes: 10 additions & 1 deletion domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type SchemaValidator interface {
Restart()
// Reset resets SchemaValidator to initial state.
Reset()
// IsStarted indicates whether SchemaValidator is started.
IsStarted() bool
}

type deltaSchemaInfo struct {
Expand All @@ -72,6 +74,13 @@ func NewSchemaValidator(lease time.Duration) SchemaValidator {
}
}

func (s *schemaValidator) IsStarted() bool {
s.mux.Lock()
isStarted := s.isStarted
s.mux.Unlock()
return isStarted
}

func (s *schemaValidator) Stop() {
log.Info("[domain-ddl] the schema validator stops")
s.mux.Lock()
Expand Down Expand Up @@ -165,7 +174,7 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedTableIDs [
defer s.mux.RUnlock()
if !s.isStarted {
log.Infof("[domain-ddl] the schema validator stopped before checking")
return ResultFail
return ResultUnknown
}
if s.lease == 0 {
return ResultSucc
Expand Down
2 changes: 1 addition & 1 deletion domain/schema_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (*testSuite) TestSchemaValidator(c *C) {
isTablesChanged := validator.isRelatedTablesChanged(item.schemaVer, []int64{10})
c.Assert(isTablesChanged, IsTrue)
valid = validator.Check(item.leaseGrantTS, item.schemaVer, []int64{10})
c.Assert(valid, Equals, ResultFail)
c.Assert(valid, Equals, ResultUnknown)
validator.Restart()

// Sleep for a long time, check schema is invalid.
Expand Down

0 comments on commit baaca37

Please sign in to comment.