diff --git a/ddl/ddl.go b/ddl/ddl.go index 61ddd49bf92c5..59b2b87081599 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -264,7 +264,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, var syncer SchemaSyncer if etcdCli == nil { // The etcdCli is nil if the store is localstore which is only used for testing. - // So we use mockOwnerManager and mockSchemaSyncer. + // So we use mockOwnerManager and MockSchemaSyncer. manager = owner.NewMockManager(id, cancelFunc) syncer = NewMockSchemaSyncer() } else { diff --git a/ddl/fail_db_test.go b/ddl/fail_db_test.go index 50b010e9bd63e..aec1cc57ea09d 100644 --- a/ddl/fail_db_test.go +++ b/ddl/fail_db_test.go @@ -18,11 +18,13 @@ import ( gofail "github.com/etcd-io/gofail/runtime" . "github.com/pingcap/check" + "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" "golang.org/x/net/context" ) @@ -150,3 +152,43 @@ func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) { _, err = s.se.Execute(context.Background(), "drop database cancel_job_db") c.Assert(err, IsNil) } + +// TestFailSchemaSyncer test when the schema syncer is done, +// should prohibit DML executing until the syncer is restartd by loadSchemaInLoop. +func (s *testDBSuite) TestFailSchemaSyncer(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + defer tk.MustExec("drop table if exists t") + c.Assert(s.dom.SchemaValidator.IsStarted(), IsTrue) + mockSyncer, ok := s.dom.DDL().SchemaSyncer().(*ddl.MockSchemaSyncer) + c.Assert(ok, IsTrue) + + // make reload failed. + s.dom.MockReloadFailed.SetValue(true) + mockSyncer.CloseSession() + // wait the schemaValidator is stopped. + for i := 0; i < 50; i++ { + if s.dom.SchemaValidator.IsStarted() == false { + break + } + time.Sleep(20 * time.Millisecond) + } + + c.Assert(s.dom.SchemaValidator.IsStarted(), IsFalse) + _, err := tk.Exec("insert into t values(1)") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[domain:1]Information schema is out of date.") + s.dom.MockReloadFailed.SetValue(false) + // wait the schemaValidator is started. + for i := 0; i < 50; i++ { + if s.dom.SchemaValidator.IsStarted() == true { + break + } + time.Sleep(100 * time.Millisecond) + } + c.Assert(s.dom.SchemaValidator.IsStarted(), IsTrue) + _, err = tk.Exec("insert into t values(1)") + c.Assert(err, IsNil) +} diff --git a/ddl/mock.go b/ddl/mock.go index 70562e9a496cd..cf7312318cd93 100644 --- a/ddl/mock.go +++ b/ddl/mock.go @@ -25,53 +25,64 @@ import ( "golang.org/x/net/context" ) -var _ SchemaSyncer = &mockSchemaSyncer{} +var _ SchemaSyncer = &MockSchemaSyncer{} const mockCheckVersInterval = 2 * time.Millisecond -type mockSchemaSyncer struct { +// MockSchemaSyncer is a mock schema syncer, it is exported for tesing. +type MockSchemaSyncer struct { selfSchemaVersion int64 globalVerCh chan clientv3.WatchResponse + mockSession chan struct{} } // NewMockSchemaSyncer creates a new mock SchemaSyncer. func NewMockSchemaSyncer() SchemaSyncer { - return &mockSchemaSyncer{} + return &MockSchemaSyncer{} } // Init implements SchemaSyncer.Init interface. -func (s *mockSchemaSyncer) Init(ctx context.Context) error { +func (s *MockSchemaSyncer) Init(ctx context.Context) error { s.globalVerCh = make(chan clientv3.WatchResponse, 1) + s.mockSession = make(chan struct{}, 1) return nil } // GlobalVersionCh implements SchemaSyncer.GlobalVersionCh interface. -func (s *mockSchemaSyncer) GlobalVersionCh() clientv3.WatchChan { +func (s *MockSchemaSyncer) GlobalVersionCh() clientv3.WatchChan { return s.globalVerCh } // WatchGlobalSchemaVer implements SchemaSyncer.WatchGlobalSchemaVer interface. -func (s *mockSchemaSyncer) WatchGlobalSchemaVer(context.Context) {} +func (s *MockSchemaSyncer) WatchGlobalSchemaVer(context.Context) {} // UpdateSelfVersion implements SchemaSyncer.UpdateSelfVersion interface. -func (s *mockSchemaSyncer) UpdateSelfVersion(ctx context.Context, version int64) error { +func (s *MockSchemaSyncer) UpdateSelfVersion(ctx context.Context, version int64) error { atomic.StoreInt64(&s.selfSchemaVersion, version) return nil } // Done implements SchemaSyncer.Done interface. -func (s *mockSchemaSyncer) Done() <-chan struct{} { - return make(chan struct{}, 1) +func (s *MockSchemaSyncer) Done() <-chan struct{} { + return s.mockSession +} + +// CloseSession mockSession, it is exported for testing. +func (s *MockSchemaSyncer) CloseSession() { + close(s.mockSession) } // Restart implements SchemaSyncer.Restart interface. -func (s *mockSchemaSyncer) Restart(_ context.Context) error { return nil } +func (s *MockSchemaSyncer) Restart(_ context.Context) error { + s.mockSession = make(chan struct{}, 1) + return nil +} // RemoveSelfVersionPath implements SchemaSyncer.RemoveSelfVersionPath interface. -func (s *mockSchemaSyncer) RemoveSelfVersionPath() error { return nil } +func (s *MockSchemaSyncer) RemoveSelfVersionPath() error { return nil } // OwnerUpdateGlobalVersion implements SchemaSyncer.OwnerUpdateGlobalVersion interface. -func (s *mockSchemaSyncer) OwnerUpdateGlobalVersion(ctx context.Context, version int64) error { +func (s *MockSchemaSyncer) OwnerUpdateGlobalVersion(ctx context.Context, version int64) error { select { case s.globalVerCh <- clientv3.WatchResponse{}: default: @@ -80,12 +91,12 @@ func (s *mockSchemaSyncer) OwnerUpdateGlobalVersion(ctx context.Context, version } // MustGetGlobalVersion implements SchemaSyncer.MustGetGlobalVersion interface. -func (s *mockSchemaSyncer) MustGetGlobalVersion(ctx context.Context) (int64, error) { +func (s *MockSchemaSyncer) MustGetGlobalVersion(ctx context.Context) (int64, error) { return 0, nil } // OwnerCheckAllVersions implements SchemaSyncer.OwnerCheckAllVersions interface. -func (s *mockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer int64) error { +func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer int64) error { ticker := time.NewTicker(mockCheckVersInterval) defer ticker.Stop() diff --git a/domain/domain.go b/domain/domain.go index 544f451c7099f..0763c93dc5ced 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -354,11 +354,26 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) { case <-syncer.Done(): // The schema syncer stops, we need stop the schema validator to synchronize the schema version. log.Info("[ddl] reload schema in loop, schema syncer need restart") + // The etcd is responsible for schema synchronization, we should ensure there is at most two diffrent schema version + // in the TiDB cluster, to make the data/schema be consistent. If we lost connection/session to etcd, the cluster + // will treats this TiDB as a down instance, and etcd will remove the key of `/tidb/ddl/all_schema_versions/tidb-id`. + // Say the schema version now is 1, the owner is changing the schema version to 2, it will not wait for this down TiDB syncing the schema, + // then continue to change the TiDB schema to version 3. Unfortunately, this down TiDB schema version will still be version 1. + // And version 1 is not consistent to version 3. So we need to stop the schema validator to prohibit the DML executing. + do.SchemaValidator.Stop() err := do.mustRestartSyncer() if err != nil { log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err)) break } + // The schema maybe changed, must reload schema then the schema validator can restart. + exitLoop := do.mustReload() + if exitLoop { + // domain is closed. + log.Errorf("[ddl] domain is closed. exit loadSchemaInLoop") + return + } + do.SchemaValidator.Restart() log.Info("[ddl] schema syncer restarted.") case <-do.exit: return @@ -388,6 +403,29 @@ func (do *Domain) mustRestartSyncer() error { } } +// mustReload tries to Reload the schema, it returns until it's successful or the domain is closed. +// it returns false when it is sucessful, returns true when the domain is closed. +func (do *Domain) mustReload() (exitLoop bool) { + for { + err := do.Reload() + if err == nil { + log.Infof("[ddl] mustReload succeed.") + return false + } + + log.Infof("[ddl] reload the schema failed: %v", err) + // If the domain is closed, we returns immediately. + select { + case <-do.exit: + log.Infof("[ddl] domain is closed.") + return true + default: + } + + time.Sleep(200 * time.Millisecond) + } +} + // Close closes the Domain and release its resource. func (do *Domain) Close() { if do.ddl != nil { diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 7408ada38adc5..212ebac19b677 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -46,6 +46,8 @@ type SchemaValidator interface { Restart() // Reset resets SchemaValidator to initial state. Reset() + // IsStarted indicates whether SchemaValidator is started. + IsStarted() bool } type deltaSchemaInfo struct { @@ -72,6 +74,13 @@ func NewSchemaValidator(lease time.Duration) SchemaValidator { } } +func (s *schemaValidator) IsStarted() bool { + s.mux.Lock() + isStarted := s.isStarted + s.mux.Unlock() + return isStarted +} + func (s *schemaValidator) Stop() { log.Info("[domain-ddl] the schema validator stops") s.mux.Lock() @@ -165,7 +174,7 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedTableIDs [ defer s.mux.RUnlock() if !s.isStarted { log.Infof("[domain-ddl] the schema validator stopped before checking") - return ResultFail + return ResultUnknown } if s.lease == 0 { return ResultSucc diff --git a/domain/schema_validator_test.go b/domain/schema_validator_test.go index 05a9b567f5e28..47b6b13fbe890 100644 --- a/domain/schema_validator_test.go +++ b/domain/schema_validator_test.go @@ -57,7 +57,7 @@ func (*testSuite) TestSchemaValidator(c *C) { isTablesChanged := validator.isRelatedTablesChanged(item.schemaVer, []int64{10}) c.Assert(isTablesChanged, IsTrue) valid = validator.Check(item.leaseGrantTS, item.schemaVer, []int64{10}) - c.Assert(valid, Equals, ResultFail) + c.Assert(valid, Equals, ResultUnknown) validator.Restart() // Sleep for a long time, check schema is invalid.