Skip to content

Commit

Permalink
ddl: add multiple table infos (pingcap#974)
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 authored Feb 8, 2025
1 parent 335c91f commit 9c09d98
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 29 deletions.
7 changes: 7 additions & 0 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1913,6 +1913,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
var addNames, dropNames []commonEvent.SchemaTableName
allFiltered := true
resultQuerys := make([]string, 0)
tableInfos := make([]*common.TableInfo, 0)
if len(querys) != len(rawEvent.MultipleTableInfos) {
log.Panic("rename tables length is not equal table infos", zap.Any("querys", querys), zap.Any("tableInfos", rawEvent.MultipleTableInfos))
}
Expand All @@ -1927,6 +1928,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
allPhysicalIDs := getAllPartitionIDs(rawEvent.TableInfo)
if !ignorePrevTable {
resultQuerys = append(resultQuerys, querys[i])
tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.CurrentSchemaID, rawEvent.CurrentSchemaName, tableInfo))
ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, allPhysicalIDs...)
if !ignoreCurrentTable {
// check whether schema change
Expand Down Expand Up @@ -1969,6 +1971,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
} else {
if !ignorePrevTable {
resultQuerys = append(resultQuerys, querys[i])
tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.CurrentSchemaID, rawEvent.CurrentSchemaName, tableInfo))
ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, tableInfo.ID)
if !ignoreCurrentTable {
if rawEvent.PrevSchemaIDs[i] != rawEvent.CurrentSchemaIDs[i] {
Expand Down Expand Up @@ -2017,6 +2020,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
}
}
ddlEvent.Query = strings.Join(resultQuerys, "")
ddlEvent.MultipleTableInfos = tableInfos
return ddlEvent, true
}

Expand Down Expand Up @@ -2057,6 +2061,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte
ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, physicalTableCount)
addName := make([]commonEvent.SchemaTableName, 0, logicalTableCount)
resultQuerys := make([]string, 0, logicalTableCount)
tableInfos := make([]*common.TableInfo, 0, logicalTableCount)
for i, info := range rawEvent.MultipleTableInfos {
if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O, info) {
log.Info("build ddl event for create tables filter table",
Expand All @@ -2082,11 +2087,13 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte
TableName: info.Name.O,
})
resultQuerys = append(resultQuerys, querys[i])
tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.CurrentSchemaID, rawEvent.CurrentSchemaName, info))
}
ddlEvent.TableNameChange = &commonEvent.TableNameChange{
AddName: addName,
}
ddlEvent.Query = strings.Join(resultQuerys, "")
ddlEvent.MultipleTableInfos = tableInfos
if len(ddlEvent.NeedAddedTables) == 0 {
log.Fatal("should not happen")
}
Expand Down
65 changes: 36 additions & 29 deletions pkg/common/event/ddl_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,46 +150,27 @@ func (d *DDLEvent) GetEvents() []*DDLEvent {
FinishedTs: d.FinishedTs,
},
}
case model.ActionCreateTables:
events := make([]*DDLEvent, 0, len(d.TableNameChange.AddName))
case model.ActionCreateTables, model.ActionRenameTables:
events := make([]*DDLEvent, 0, len(d.MultipleTableInfos))
queries, err := SplitQueries(d.Query)
if err != nil {
log.Panic("split queries failed", zap.Error(err))
}
if len(queries) != len(d.TableNameChange.AddName) {
log.Panic("queries length should be equal to addName length", zap.String("query", d.Query), zap.Any("addName", d.TableNameChange.AddName))
if len(queries) != len(d.MultipleTableInfos) {
log.Panic("queries length should be equal to multipleTableInfos length", zap.String("query", d.Query), zap.Any("multipleTableInfos", d.MultipleTableInfos))
}
for i, schemaAndTable := range d.TableNameChange.AddName {
for i, info := range d.MultipleTableInfos {
events = append(events, &DDLEvent{
Version: d.Version,
Type: d.Type,
SchemaName: schemaAndTable.SchemaName,
TableName: schemaAndTable.TableName,
SchemaName: info.GetSchemaName(),
TableName: info.GetTableName(),
TableInfo: info,
Query: queries[i],
FinishedTs: d.FinishedTs,
})
}
return events
case model.ActionRenameTables:
events := make([]*DDLEvent, 0, len(d.TableNameChange.DropName))
queries, err := SplitQueries(d.Query)
if err != nil {
log.Panic("split queries failed", zap.Error(err))
}
if len(queries) != len(d.TableNameChange.DropName) {
log.Panic("queries length should be equal to dropName length", zap.String("query", d.Query), zap.Any("dropName", d.TableNameChange.DropName))
}
for i, schemaAndTable := range d.TableNameChange.DropName {
events = append(events, &DDLEvent{
Version: d.Version,
Type: d.Type,
PrevSchemaName: schemaAndTable.SchemaName,
PrevTableName: schemaAndTable.TableName,
Query: queries[i],
FinishedTs: d.FinishedTs,
})
}
return events
default:
}
return []*DDLEvent{d}
Expand Down Expand Up @@ -247,7 +228,7 @@ func (e *DDLEvent) GetDDLType() model.ActionType {
}

func (t DDLEvent) Marshal() ([]byte, error) {
// restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | errorData | errorDataSize
// restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipletableInfosDataSize |errorData | errorDataSize
data, err := json.Marshal(t)
if err != nil {
return nil, err
Expand All @@ -273,6 +254,20 @@ func (t DDLEvent) Marshal() ([]byte, error) {
data = append(data, tableInfoDataSize...)
}

for _, info := range t.MultipleTableInfos {
tableInfoData, err := info.Marshal()
if err != nil {
return nil, err
}
tableInfoDataSize := make([]byte, 8)
binary.BigEndian.PutUint64(tableInfoDataSize, uint64(len(tableInfoData)))
data = append(data, tableInfoData...)
data = append(data, tableInfoDataSize...)
}
multipletableInfosDataSize := make([]byte, 8)
binary.BigEndian.PutUint64(multipletableInfosDataSize, uint64(len(t.MultipleTableInfos)))
data = append(data, multipletableInfosDataSize...)

if t.Err != nil {
errData := []byte(t.Err.Error())
errDataSize := make([]byte, 8)
Expand All @@ -288,7 +283,7 @@ func (t DDLEvent) Marshal() ([]byte, error) {
}

func (t *DDLEvent) Unmarshal(data []byte) error {
// restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | errorData | errorDataSize
// restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipletableInfosDataSize | errorData | errorDataSize
t.eventSize = int64(len(data))
errorDataSize := binary.BigEndian.Uint64(data[len(data)-8:])
if errorDataSize > 0 {
Expand All @@ -297,6 +292,18 @@ func (t *DDLEvent) Unmarshal(data []byte) error {
t.Err = apperror.ErrDDLEventError.FastGen(string(errorData))
}
end := len(data) - 8 - int(errorDataSize)
multipletableInfosDataSize := binary.BigEndian.Uint64(data[end-8 : end])
for i := 0; i < int(multipletableInfosDataSize); i++ {
tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end])
tableInfoData := data[end-8-int(tableInfoDataSize) : end-8]
info, err := common.UnmarshalJSONToTableInfo(tableInfoData)
if err != nil {
return err
}
t.MultipleTableInfos = append(t.MultipleTableInfos, info)
end -= 8 + int(tableInfoDataSize)
}
end -= 8 + int(multipletableInfosDataSize)
tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end])
var err error
if tableInfoDataSize > 0 {
Expand Down

0 comments on commit 9c09d98

Please sign in to comment.