Skip to content

Commit

Permalink
schemastore: support rename tables (pingcap#796)
Browse files Browse the repository at this point in the history
* support rename tables

* small refactor

* fix test
  • Loading branch information
lidezhu authored Jan 6, 2025
1 parent 8d31827 commit fadff65
Show file tree
Hide file tree
Showing 6 changed files with 1,107 additions and 290 deletions.
33 changes: 7 additions & 26 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,10 @@ func (p *persistentStorage) fetchTableDDLEvents(tableID int64, tableFilter filte
events := make([]commonEvent.DDLEvent, 0, len(allTargetTs))
for _, ts := range allTargetTs {
rawEvent := readPersistedDDLEvent(storageSnap, ts)
// TODO: if ExtraSchemaName and other fields are empty, does it cause any problem?
if tableFilter != nil &&
tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.CurrentSchemaName, rawEvent.CurrentTableName) &&
tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.PrevSchemaName, rawEvent.PrevTableName) {
continue
ddlEvent, ok := buildDDLEvent(&rawEvent, tableFilter)
if ok {
events = append(events, ddlEvent)
}
events = append(events, buildDDLEvent(&rawEvent, tableFilter))
}
// log.Info("fetchTableDDLEvents",
// zap.Int64("tableID", tableID),
Expand Down Expand Up @@ -458,26 +455,10 @@ func (p *persistentStorage) fetchTableTriggerDDLEvents(tableFilter filter.Filter
p.mu.RUnlock()
for _, ts := range allTargetTs {
rawEvent := readPersistedDDLEvent(storageSnap, ts)
if tableFilter != nil {
if rawEvent.Type == byte(model.ActionCreateTables) {
allFiltered := true
for _, tableInfo := range rawEvent.MultipleTableInfos {
if !tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.CurrentSchemaName, tableInfo.Name.O) {
allFiltered = false
break
}
}
if allFiltered {
continue
}
} else {
if tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.CurrentSchemaName, rawEvent.CurrentTableName) &&
tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.PrevSchemaName, rawEvent.PrevTableName) {
continue
}
}
ddlEvent, ok := buildDDLEvent(&rawEvent, tableFilter)
if ok {
events = append(events, ddlEvent)
}
events = append(events, buildDDLEvent(&rawEvent, tableFilter))
}
storageSnap.Close()
if len(events) >= limit {
Expand Down Expand Up @@ -784,7 +765,7 @@ func shouldSkipDDL(job *model.Job, tableMap map[int64]*BasicTableInfo) bool {
return false
}

func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commonEvent.DDLEvent {
func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) (commonEvent.DDLEvent, bool) {
handler, ok := allDDLHandlers[model.ActionType(rawEvent.Type)]
if !ok {
log.Panic("unknown ddl type", zap.Any("ddlType", rawEvent.Type), zap.String("query", rawEvent.Query))
Expand Down
Loading

0 comments on commit fadff65

Please sign in to comment.