Skip to content

Commit

Permalink
Merge branch 'release-5.4' into cherry-pick-4474-to-release-5.4
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored Apr 28, 2022
2 parents d0a5e88 + bd9849b commit 9595058
Show file tree
Hide file tree
Showing 16 changed files with 332 additions and 97 deletions.
11 changes: 9 additions & 2 deletions cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
ineligibleTables, _, err := VerifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -199,7 +199,9 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
return newInfo, nil
}

func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
// VerifyTables catalog tables specified by ReplicaConfig into
// eligible (has an unique index or primary key) and ineligible tables.
func VerifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
filter, err := filter.NewFilter(replicaConfig)
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -217,6 +219,11 @@ func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, s
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
}
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if tableInfo.IsSequence() {
continue
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
Expand Down
67 changes: 44 additions & 23 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type schemaSnapshot struct {

currentTs uint64

// if explicit is true, treat tables without explicit row id as eligible
explicitTables bool
// if forceReplicate is true, treat ineligible tables as eligible.
forceReplicate bool
}

// SingleSchemaSnapshot is a single schema snapshot independent of schema storage
Expand Down Expand Up @@ -101,17 +101,17 @@ func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo,
}

// NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*SingleSchemaSnapshot, error) {
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*SingleSchemaSnapshot, error) {
// meta is nil only in unit tests
if meta == nil {
snap := newEmptySchemaSnapshot(explicitTables)
snap := newEmptySchemaSnapshot(forceReplicate)
snap.currentTs = currentTs
return snap, nil
}
return newSchemaSnapshotFromMeta(meta, currentTs, explicitTables)
return newSchemaSnapshotFromMeta(meta, currentTs, forceReplicate)
}

func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
func newEmptySchemaSnapshot(forceReplicate bool) *schemaSnapshot {
return &schemaSnapshot{
tableNameToID: make(map[model.TableName]int64),
schemaNameToID: make(map[string]int64),
Expand All @@ -124,12 +124,12 @@ func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
truncateTableID: make(map[int64]struct{}),
ineligibleTableID: make(map[int64]struct{}),

explicitTables: explicitTables,
forceReplicate: forceReplicate,
}
}

func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(explicitTables)
func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(forceReplicate)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
Expand All @@ -149,7 +149,7 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTabl
tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo)
snap.tables[tableInfo.ID] = tableInfo
snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID
isEligible := tableInfo.IsEligible(explicitTables)
isEligible := tableInfo.IsEligible(forceReplicate)
if !isEligible {
snap.ineligibleTableID[tableInfo.ID] = struct{}{}
}
Expand Down Expand Up @@ -476,7 +476,7 @@ func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error {
zap.Int64("add partition id", partition.ID))
}
s.partitionTable[partition.ID] = tbl
if !tbl.IsEligible(s.explicitTables) {
if !tbl.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
delete(oldIDs, partition.ID)
Expand Down Expand Up @@ -512,14 +512,20 @@ func (s *schemaSnapshot) createTable(table *model.TableInfo) error {
s.tableInSchema[table.SchemaID] = tableInSchema

s.tables[table.ID] = table
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
if !table.IsEligible(s.forceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !table.IsSequence() {
log.Warn("this table is ineligible to replicate",
zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
}
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible(s.explicitTables) {
if !table.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand All @@ -537,14 +543,20 @@ func (s *schemaSnapshot) replaceTable(table *model.TableInfo) error {
return cerror.ErrSnapshotTableNotFound.GenWithStack("table %s(%d)", table.Name, table.ID)
}
s.tables[table.ID] = table
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
if !table.IsEligible(s.forceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !table.IsSequence() {
log.Warn("this table is ineligible to replicate",
zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
}
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible(s.explicitTables) {
if !table.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand Down Expand Up @@ -592,7 +604,10 @@ func (s *schemaSnapshot) handleDDL(job *timodel.Job) error {
return errors.Trace(err)
}
case timodel.ActionRenameTables:
return s.renameTables(job)
err := s.renameTables(job)
if err != nil {
return errors.Trace(err)
}
case timodel.ActionCreateTable, timodel.ActionCreateView, timodel.ActionRecoverTable:
err := s.createTable(getWrapTableInfo(job))
if err != nil {
Expand Down Expand Up @@ -715,7 +730,7 @@ type schemaStorageImpl struct {
resolvedTs uint64

filter *filter.Filter
explicitTables bool
forceReplicate bool
}

// NewSchemaStorage creates a new schema storage
Expand All @@ -734,7 +749,7 @@ func NewSchemaStorage(meta *timeta.Meta, startTs uint64, filter *filter.Filter,
snaps: []*schemaSnapshot{snap},
resolvedTs: startTs,
filter: filter,
explicitTables: forceReplicate,
forceReplicate: forceReplicate,
}
return schema, nil
}
Expand Down Expand Up @@ -808,17 +823,23 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error {
if len(s.snaps) > 0 {
lastSnap := s.snaps[len(s.snaps)-1]
if job.BinlogInfo.FinishedTS <= lastSnap.currentTs {
log.Info("ignore foregone DDL",
zap.Int64("jobID", job.ID), zap.String("DDL", job.Query))
log.Info("ignore foregone DDL", zap.Int64("jobID", job.ID),
zap.String("DDL", job.Query), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS))
return nil
}
snap = lastSnap.Clone()
} else {
snap = newEmptySchemaSnapshot(s.explicitTables)
snap = newEmptySchemaSnapshot(s.forceReplicate)
}
if err := snap.handleDDL(job); err != nil {
log.Error("handle DDL failed", zap.String("DDL", job.Query),
zap.Stringer("job", job), zap.Error(err),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS))
return errors.Trace(err)
}
log.Info("handle DDL", zap.String("DDL", job.Query),
zap.Stringer("job", job), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS))

s.snaps = append(s.snaps, snap)
s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS)
return nil
Expand Down
19 changes: 11 additions & 8 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,11 @@ func TestHandleRenameTables(t *testing.T) {
rawArgs, err := json.Marshal(args)
require.Nil(t, err)
var job *timodel.Job = &timodel.Job{
Type: timodel.ActionRenameTables,
RawArgs: rawArgs,
BinlogInfo: &timodel.HistoryInfo{},
Type: timodel.ActionRenameTables,
RawArgs: rawArgs,
BinlogInfo: &timodel.HistoryInfo{
FinishedTS: 11112222,
},
}
job.BinlogInfo.MultipleTableInfos = append(job.BinlogInfo.MultipleTableInfos,
&timodel.TableInfo{
Expand Down Expand Up @@ -442,6 +444,7 @@ func TestHandleRenameTables(t *testing.T) {
t2 := model.TableName{Schema: "db_1", Table: "y"}
require.Equal(t, snap.tableNameToID[t1], int64(13))
require.Equal(t, snap.tableNameToID[t2], int64(14))
require.Equal(t, uint64(11112222), snap.currentTs)
}

func testDoDDLAndCheck(t *testing.T, snap *schemaSnapshot, job *timodel.Job, isErr bool) {
Expand Down Expand Up @@ -775,7 +778,7 @@ func TestSnapshotClone(t *testing.T) {
require.Nil(t, err)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
require.Nil(t, err)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false /* explicitTables */)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false /* forceReplicate */)
require.Nil(t, err)

clone := snap.Clone()
Expand All @@ -784,7 +787,7 @@ func TestSnapshotClone(t *testing.T) {
require.Equal(t, clone.truncateTableID, snap.truncateTableID)
require.Equal(t, clone.ineligibleTableID, snap.ineligibleTableID)
require.Equal(t, clone.currentTs, snap.currentTs)
require.Equal(t, clone.explicitTables, snap.explicitTables)
require.Equal(t, clone.forceReplicate, snap.forceReplicate)
require.Equal(t, len(clone.tables), len(snap.tables))
require.Equal(t, len(clone.schemas), len(snap.schemas))
require.Equal(t, len(clone.partitionTable), len(snap.partitionTable))
Expand Down Expand Up @@ -818,13 +821,13 @@ func TestExplicitTables(t *testing.T) {
require.Nil(t, err)
meta1, err := kv.GetSnapshotMeta(store, ver1.Ver)
require.Nil(t, err)
snap1, err := newSchemaSnapshotFromMeta(meta1, ver1.Ver, true /* explicitTables */)
snap1, err := newSchemaSnapshotFromMeta(meta1, ver1.Ver, true /* forceReplicate */)
require.Nil(t, err)
meta2, err := kv.GetSnapshotMeta(store, ver2.Ver)
require.Nil(t, err)
snap2, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, false /* explicitTables */)
snap2, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, false /* forceReplicate */)
require.Nil(t, err)
snap3, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, true /* explicitTables */)
snap3, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, true /* forceReplicate */)
require.Nil(t, err)

require.Equal(t, len(snap2.tables)-len(snap1.tables), 5)
Expand Down
5 changes: 5 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ func (ti *TableInfo) ExistTableUniqueColumn() bool {

// IsEligible returns whether the table is a eligible table
func (ti *TableInfo) IsEligible(forceReplicate bool) bool {
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if ti.IsSequence() {
return false
}
if forceReplicate {
return true
}
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,19 @@ func TestTableInfoGetterFuncs(t *testing.T) {
info = WrapTableInfo(1, "test", 0, &tbl)
require.False(t, info.IsEligible(false))
require.True(t, info.IsEligible(true))

// View is eligible.
tbl.View = &timodel.ViewInfo{}
info = WrapTableInfo(1, "test", 0, &tbl)
require.True(t, info.IsView())
require.True(t, info.IsEligible(false))

// Sequence is ineligible.
tbl.Sequence = &timodel.SequenceInfo{}
info = WrapTableInfo(1, "test", 0, &tbl)
require.True(t, info.IsSequence())
require.False(t, info.IsEligible(false))
require.False(t, info.IsEligible(true))
}

func TestTableInfoClone(t *testing.T) {
Expand Down
46 changes: 11 additions & 35 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,48 +116,24 @@ func (s *schemaWrap4Owner) BuildDDLEvent(job *timodel.Job) (*model.DDLEvent, err
return ddlEvent, nil
}

func (s *schemaWrap4Owner) SinkTableInfos() []*model.SimpleTableInfo {
var sinkTableInfos []*model.SimpleTableInfo
for tableID := range s.schemaSnapshot.CloneTables() {
tblInfo, ok := s.schemaSnapshot.TableByID(tableID)
if !ok {
log.Panic("table not found for table ID", zap.Int64("tid", tableID))
}
if s.shouldIgnoreTable(tblInfo) {
continue
}
dbInfo, ok := s.schemaSnapshot.SchemaByTableID(tableID)
if !ok {
log.Panic("schema not found for table ID", zap.Int64("tid", tableID))
}

// TODO separate function for initializing SimpleTableInfo
sinkTableInfo := new(model.SimpleTableInfo)
sinkTableInfo.Schema = dbInfo.Name.O
sinkTableInfo.TableID = tableID
sinkTableInfo.Table = tblInfo.TableName.Table
sinkTableInfo.ColumnInfo = make([]*model.ColumnInfo, len(tblInfo.Cols()))
for i, colInfo := range tblInfo.Cols() {
sinkTableInfo.ColumnInfo[i] = new(model.ColumnInfo)
sinkTableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}
sinkTableInfos = append(sinkTableInfos, sinkTableInfo)
}
return sinkTableInfos
}

func (s *schemaWrap4Owner) shouldIgnoreTable(tableInfo *model.TableInfo) bool {
schemaName := tableInfo.TableName.Schema
tableName := tableInfo.TableName.Table
func (s *schemaWrap4Owner) shouldIgnoreTable(t *model.TableInfo) bool {
schemaName := t.TableName.Schema
tableName := t.TableName.Table
if s.filter.ShouldIgnoreTable(schemaName, tableName) {
return true
}
if s.config.Cyclic.IsEnabled() && mark.IsMarkTable(schemaName, tableName) {
// skip the mark table if cyclic is enabled
return true
}
if !tableInfo.IsEligible(s.config.ForceReplicate) {
log.Warn("skip ineligible table", zap.Int64("tid", tableInfo.ID), zap.Stringer("table", tableInfo.TableName))
if !t.IsEligible(s.config.ForceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !t.IsSequence() {
log.Warn("skip ineligible table",
zap.Int64("tableID", t.ID), zap.Stringer("tableName", t.TableName))
}
return true
}
return false
Expand Down
3 changes: 3 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,9 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
p.sendError(err)
return nil
})

// FIXME: using GetLastSnapshot here would be confused and get the wrong table name
// after `rename table` DDL, since `rename table` keeps the tableID unchanged
var tableName *model.TableName
retry.Do(ctx, func() error { //nolint:errcheck
if name, ok := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID); ok {
Expand Down
Loading

0 comments on commit 9595058

Please sign in to comment.