Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support to replicate tables without explicit row id #1005

Merged
merged 7 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (c *changeFeed) addTable(tblInfo *model.TableInfo, targetTs model.Ts) {
return
}

if !tblInfo.IsEligible() {
if !tblInfo.IsEligible(c.info.Config.ForceReplicate) {
log.Warn("skip ineligible table", zap.Int64("tid", tblInfo.ID), zap.Stringer("table", tblInfo.TableName))
return
}
Expand Down
91 changes: 55 additions & 36 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type schemaSnapshot struct {
ineligibleTableID map[int64]struct{}

currentTs uint64

// if explicit is true, treat tables without explicit row id as eligible
explicitTables bool
}

// SingleSchemaSnapshot is a single schema snapshot independent of schema storage
Expand Down Expand Up @@ -93,11 +96,11 @@ func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo,
}

// NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64) (*SingleSchemaSnapshot, error) {
return newSchemaSnapshotFromMeta(meta, currentTs)
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*SingleSchemaSnapshot, error) {
return newSchemaSnapshotFromMeta(meta, currentTs, explicitTables)
}

func newEmptySchemaSnapshot() *schemaSnapshot {
func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
return &schemaSnapshot{
tableNameToID: make(map[model.TableName]int64),
schemaNameToID: make(map[string]int64),
Expand All @@ -108,11 +111,13 @@ func newEmptySchemaSnapshot() *schemaSnapshot {

truncateTableID: make(map[int64]struct{}),
ineligibleTableID: make(map[int64]struct{}),

explicitTables: explicitTables,
}
}

func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot()
func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(explicitTables)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
Expand All @@ -132,7 +137,7 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64) (*schemaSnap
tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo)
snap.tables[tableInfo.ID] = tableInfo
snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID
isEligible := tableInfo.IsEligible()
isEligible := tableInfo.IsEligible(explicitTables)
if !isEligible {
snap.ineligibleTableID[tableInfo.ID] = struct{}{}
}
Expand Down Expand Up @@ -196,39 +201,51 @@ func (s *schemaSnapshot) PrintStatus(logger func(msg string, fields ...zap.Field

// Clone clones Storage
func (s *schemaSnapshot) Clone() *schemaSnapshot {
n := &schemaSnapshot{
tableNameToID: make(map[model.TableName]int64, len(s.tableNameToID)),
schemaNameToID: make(map[string]int64, len(s.schemaNameToID)),

schemas: make(map[int64]*timodel.DBInfo, len(s.schemas)),
tables: make(map[int64]*model.TableInfo, len(s.tables)),
partitionTable: make(map[int64]*model.TableInfo, len(s.partitionTable)),
clone := *s

truncateTableID: make(map[int64]struct{}, len(s.truncateTableID)),
ineligibleTableID: make(map[int64]struct{}, len(s.ineligibleTableID)),
}
tableNameToID := make(map[model.TableName]int64, len(s.tableNameToID))
for k, v := range s.tableNameToID {
n.tableNameToID[k] = v
tableNameToID[k] = v
}
clone.tableNameToID = tableNameToID

schemaNameToID := make(map[string]int64, len(s.schemaNameToID))
for k, v := range s.schemaNameToID {
n.schemaNameToID[k] = v
schemaNameToID[k] = v
}
clone.schemaNameToID = schemaNameToID

schemas := make(map[int64]*timodel.DBInfo, len(s.schemas))
for k, v := range s.schemas {
n.schemas[k] = v.Clone()
schemas[k] = v.Clone()
}
clone.schemas = schemas

tables := make(map[int64]*model.TableInfo, len(s.tables))
for k, v := range s.tables {
n.tables[k] = v.Clone()
tables[k] = v.Clone()
}
clone.tables = tables

partitionTable := make(map[int64]*model.TableInfo, len(s.partitionTable))
for k, v := range s.partitionTable {
n.partitionTable[k] = v.Clone()
partitionTable[k] = v.Clone()
}
clone.partitionTable = partitionTable

truncateTableID := make(map[int64]struct{}, len(s.truncateTableID))
for k, v := range s.truncateTableID {
n.truncateTableID[k] = v
truncateTableID[k] = v
}
clone.truncateTableID = truncateTableID

ineligibleTableID := make(map[int64]struct{}, len(s.ineligibleTableID))
for k, v := range s.ineligibleTableID {
n.ineligibleTableID[k] = v
ineligibleTableID[k] = v
}
return n
clone.ineligibleTableID = ineligibleTableID

return &clone
}

// GetTableNameByID looks up a TableName with the given table id
Expand Down Expand Up @@ -464,14 +481,14 @@ func (s *schemaSnapshot) createTable(table *model.TableInfo) error {
schema.Tables = append(schema.Tables, table.TableInfo)

s.tables[table.ID] = table
if !table.IsEligible() {
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible() {
if !table.IsEligible(s.explicitTables) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand All @@ -489,14 +506,14 @@ func (s *schemaSnapshot) replaceTable(table *model.TableInfo) error {
return cerror.ErrSnapshotTableNotFound.GenWithStack("table %s(%d)", table.Name, table.ID)
}
s.tables[table.ID] = table
if !table.IsEligible() {
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible() {
if !table.IsEligible(s.explicitTables) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand Down Expand Up @@ -610,25 +627,27 @@ type SchemaStorage struct {
gcTs uint64
resolvedTs uint64

filter *filter.Filter
filter *filter.Filter
explicitTables bool
}

// NewSchemaStorage creates a new schema storage
func NewSchemaStorage(meta *timeta.Meta, startTs uint64, filter *filter.Filter) (*SchemaStorage, error) {
func NewSchemaStorage(meta *timeta.Meta, startTs uint64, filter *filter.Filter, forceReplicate bool) (*SchemaStorage, error) {
var snap *schemaSnapshot
var err error
if meta == nil {
snap = newEmptySchemaSnapshot()
snap = newEmptySchemaSnapshot(forceReplicate)
} else {
snap, err = newSchemaSnapshotFromMeta(meta, startTs)
snap, err = newSchemaSnapshotFromMeta(meta, startTs, forceReplicate)
}
if err != nil {
return nil, errors.Trace(err)
}
schema := &SchemaStorage{
snaps: []*schemaSnapshot{snap},
resolvedTs: startTs,
filter: filter,
snaps: []*schemaSnapshot{snap},
resolvedTs: startTs,
filter: filter,
explicitTables: forceReplicate,
}
return schema, nil
}
Expand Down Expand Up @@ -702,7 +721,7 @@ func (s *SchemaStorage) HandleDDLJob(job *timodel.Job) error {
}
snap = lastSnap.Clone()
} else {
snap = newEmptySchemaSnapshot()
snap = newEmptySchemaSnapshot(s.explicitTables)
}
if err := snap.handleDDL(job); err != nil {
return errors.Trace(err)
Expand Down
88 changes: 83 additions & 5 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (t *schemaSuite) TestSchema(c *C) {
Query: "create database test",
}
// reconstruct the local schema
snap := newEmptySchemaSnapshot()
snap := newEmptySchemaSnapshot(false)
err := snap.handleDDL(job)
c.Assert(err, IsNil)
_, exist := snap.SchemaByID(job.SchemaID)
Expand Down Expand Up @@ -195,7 +195,7 @@ func (*schemaSuite) TestTable(c *C) {
jobs = append(jobs, job)

// reconstruct the local schema
snap := newEmptySchemaSnapshot()
snap := newEmptySchemaSnapshot(false)
for _, job := range jobs {
err := snap.handleDDL(job)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -277,7 +277,7 @@ func (*schemaSuite) TestTable(c *C) {

func (t *schemaSuite) TestHandleDDL(c *C) {

snap := newEmptySchemaSnapshot()
snap := newEmptySchemaSnapshot(false)
dbName := timodel.NewCIStr("Test")
colName := timodel.NewCIStr("A")
tbName := timodel.NewCIStr("T")
Expand Down Expand Up @@ -527,7 +527,7 @@ func (t *schemaSuite) TestMultiVersionStorage(c *C) {
}

jobs = append(jobs, job)
storage, err := NewSchemaStorage(nil, 0, nil)
storage, err := NewSchemaStorage(nil, 0, nil, false)
c.Assert(err, IsNil)
for _, job := range jobs {
err := storage.HandleDDLJob(job)
Expand Down Expand Up @@ -665,7 +665,7 @@ func (t *schemaSuite) TestCreateSnapFromMeta(c *C) {
c.Assert(err, IsNil)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
c.Assert(err, IsNil)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false)
c.Assert(err, IsNil)
_, ok := snap.GetTableByName("test", "simple_test1")
c.Assert(ok, IsTrue)
Expand All @@ -677,3 +677,81 @@ func (t *schemaSuite) TestCreateSnapFromMeta(c *C) {
c.Assert(dbInfo.Name.O, Equals, "test2")
c.Assert(len(dbInfo.Tables), Equals, 3)
}

func (t *schemaSuite) TestSnapshotClone(c *C) {
store, err := mockstore.NewMockTikvStore()
c.Assert(err, IsNil)

session.SetSchemaLease(0)
session.DisableStats4Test()
domain, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
domain.SetStatsUpdating(true)
tk := testkit.NewTestKit(c, store)
tk.MustExec("create database test2")
tk.MustExec("create table test.simple_test1 (id bigint primary key)")
tk.MustExec("create table test.simple_test2 (id bigint primary key)")
tk.MustExec("create table test2.simple_test3 (id bigint primary key)")
tk.MustExec("create table test2.simple_test4 (id bigint primary key)")
tk.MustExec("create table test2.simple_test5 (a bigint)")
ver, err := store.CurrentVersion()
c.Assert(err, IsNil)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
c.Assert(err, IsNil)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false /* explicitTables */)
c.Assert(err, IsNil)

clone := snap.Clone()
c.Assert(clone.tableNameToID, DeepEquals, snap.tableNameToID)
c.Assert(clone.schemaNameToID, DeepEquals, snap.schemaNameToID)
c.Assert(clone.truncateTableID, DeepEquals, snap.truncateTableID)
c.Assert(clone.ineligibleTableID, DeepEquals, snap.ineligibleTableID)
c.Assert(clone.currentTs, Equals, snap.currentTs)
c.Assert(clone.explicitTables, Equals, snap.explicitTables)
c.Assert(len(clone.tables), Equals, len(snap.tables))
c.Assert(len(clone.schemas), Equals, len(snap.schemas))
c.Assert(len(clone.partitionTable), Equals, len(snap.partitionTable))

tableCount := len(snap.tables)
clone.tables = make(map[int64]*model.TableInfo)
c.Assert(len(snap.tables), Equals, tableCount)
}

func (t *schemaSuite) TestExplicitTables(c *C) {
store, err := mockstore.NewMockTikvStore()
c.Assert(err, IsNil)

session.SetSchemaLease(0)
session.DisableStats4Test()
domain, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
domain.SetStatsUpdating(true)
tk := testkit.NewTestKit(c, store)
ver1, err := store.CurrentVersion()
c.Assert(err, IsNil)
tk.MustExec("create database test2")
tk.MustExec("create table test.simple_test1 (id bigint primary key)")
tk.MustExec("create table test.simple_test2 (id bigint unique key)")
tk.MustExec("create table test2.simple_test3 (a bigint)")
tk.MustExec("create table test2.simple_test4 (a varchar(20) unique key)")
tk.MustExec("create table test2.simple_test5 (a varchar(20))")
ver2, err := store.CurrentVersion()
c.Assert(err, IsNil)
meta1, err := kv.GetSnapshotMeta(store, ver1.Ver)
c.Assert(err, IsNil)
snap1, err := newSchemaSnapshotFromMeta(meta1, ver1.Ver, true /* explicitTables */)
c.Assert(err, IsNil)
meta2, err := kv.GetSnapshotMeta(store, ver2.Ver)
c.Assert(err, IsNil)
snap2, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, false /* explicitTables */)
c.Assert(err, IsNil)
snap3, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, true /* explicitTables */)
c.Assert(err, IsNil)

c.Assert(len(snap2.tables)-len(snap1.tables), Equals, 5)
// some system tables are also ineligible
c.Assert(len(snap2.ineligibleTableID), GreaterEqual, 4)

c.Assert(len(snap3.tables)-len(snap1.tables), Equals, 5)
c.Assert(snap3.ineligibleTableID, HasLen, 0)
}
5 changes: 4 additions & 1 deletion cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,10 @@ func (ti *TableInfo) ExistTableUniqueColumn() bool {
}

// IsEligible returns whether the table is a eligible table
func (ti *TableInfo) IsEligible() bool {
func (ti *TableInfo) IsEligible(forceReplicate bool) bool {
if forceReplicate {
return true
}
if ti.IsView() {
return true
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (o *Owner) newChangeFeed(
if err != nil {
return nil, errors.Trace(err)
}
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, checkpointTs)
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, checkpointTs, info.Config.ForceReplicate)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func (o *Owner) newChangeFeed(
log.Warn("table not found for table ID", zap.Int64("tid", tid))
continue
}
if !tblInfo.IsEligible() {
if !tblInfo.IsEligible(info.Config.ForceReplicate) {
log.Warn("skip ineligible table", zap.Int64("tid", tid), zap.Stringer("table", table))
continue
}
Expand Down
3 changes: 2 additions & 1 deletion cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ func (s *ownerSuite) TestChangefeedApplyDDLJob(c *check.C) {
}()
t := meta.NewMeta(txn)

schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(t, 0)
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(t, 0, false)
c.Assert(err, check.IsNil)

cf := &changeFeed{
Expand All @@ -755,6 +755,7 @@ func (s *ownerSuite) TestChangefeedApplyDDLJob(c *check.C) {
orphanTables: make(map[model.TableID]model.Ts),
toCleanTables: make(map[model.TableID]model.Ts),
filter: f,
info: &model.ChangeFeedInfo{Config: config.GetDefaultReplicaConfig()},
}
for i, job := range jobs {
err = cf.schema.HandleDDL(job)
Expand Down
Loading