From 9c09d98a7400ee3bdfe3c17dea710f77bc5dd63e Mon Sep 17 00:00:00 2001 From: nhsmw Date: Sat, 8 Feb 2025 16:22:47 +0800 Subject: [PATCH] ddl: add multiple table infos (#974) --- .../persist_storage_ddl_handlers.go | 7 ++ pkg/common/event/ddl_event.go | 65 ++++++++++--------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 5dc5cb13..54232e60 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -1913,6 +1913,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte var addNames, dropNames []commonEvent.SchemaTableName allFiltered := true resultQuerys := make([]string, 0) + tableInfos := make([]*common.TableInfo, 0) if len(querys) != len(rawEvent.MultipleTableInfos) { log.Panic("rename tables length is not equal table infos", zap.Any("querys", querys), zap.Any("tableInfos", rawEvent.MultipleTableInfos)) } @@ -1927,6 +1928,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte allPhysicalIDs := getAllPartitionIDs(rawEvent.TableInfo) if !ignorePrevTable { resultQuerys = append(resultQuerys, querys[i]) + tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.CurrentSchemaID, rawEvent.CurrentSchemaName, tableInfo)) ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, allPhysicalIDs...) if !ignoreCurrentTable { // check whether schema change @@ -1969,6 +1971,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte } else { if !ignorePrevTable { resultQuerys = append(resultQuerys, querys[i]) + tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.CurrentSchemaID, rawEvent.CurrentSchemaName, tableInfo)) ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, tableInfo.ID) if !ignoreCurrentTable { if rawEvent.PrevSchemaIDs[i] != rawEvent.CurrentSchemaIDs[i] { @@ -2017,6 +2020,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte } } ddlEvent.Query = strings.Join(resultQuerys, "") + ddlEvent.MultipleTableInfos = tableInfos return ddlEvent, true } @@ -2057,6 +2061,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, physicalTableCount) addName := make([]commonEvent.SchemaTableName, 0, logicalTableCount) resultQuerys := make([]string, 0, logicalTableCount) + tableInfos := make([]*common.TableInfo, 0, logicalTableCount) for i, info := range rawEvent.MultipleTableInfos { if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O, info) { log.Info("build ddl event for create tables filter table", @@ -2082,11 +2087,13 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte TableName: info.Name.O, }) resultQuerys = append(resultQuerys, querys[i]) + tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.CurrentSchemaID, rawEvent.CurrentSchemaName, info)) } ddlEvent.TableNameChange = &commonEvent.TableNameChange{ AddName: addName, } ddlEvent.Query = strings.Join(resultQuerys, "") + ddlEvent.MultipleTableInfos = tableInfos if len(ddlEvent.NeedAddedTables) == 0 { log.Fatal("should not happen") } diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index ba1c24af..9bf85683 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -150,46 +150,27 @@ func (d *DDLEvent) GetEvents() []*DDLEvent { FinishedTs: d.FinishedTs, }, } - case model.ActionCreateTables: - events := make([]*DDLEvent, 0, len(d.TableNameChange.AddName)) + case model.ActionCreateTables, model.ActionRenameTables: + events := make([]*DDLEvent, 0, len(d.MultipleTableInfos)) queries, err := SplitQueries(d.Query) if err != nil { log.Panic("split queries failed", zap.Error(err)) } - if len(queries) != len(d.TableNameChange.AddName) { - log.Panic("queries length should be equal to addName length", zap.String("query", d.Query), zap.Any("addName", d.TableNameChange.AddName)) + if len(queries) != len(d.MultipleTableInfos) { + log.Panic("queries length should be equal to multipleTableInfos length", zap.String("query", d.Query), zap.Any("multipleTableInfos", d.MultipleTableInfos)) } - for i, schemaAndTable := range d.TableNameChange.AddName { + for i, info := range d.MultipleTableInfos { events = append(events, &DDLEvent{ Version: d.Version, Type: d.Type, - SchemaName: schemaAndTable.SchemaName, - TableName: schemaAndTable.TableName, + SchemaName: info.GetSchemaName(), + TableName: info.GetTableName(), + TableInfo: info, Query: queries[i], FinishedTs: d.FinishedTs, }) } return events - case model.ActionRenameTables: - events := make([]*DDLEvent, 0, len(d.TableNameChange.DropName)) - queries, err := SplitQueries(d.Query) - if err != nil { - log.Panic("split queries failed", zap.Error(err)) - } - if len(queries) != len(d.TableNameChange.DropName) { - log.Panic("queries length should be equal to dropName length", zap.String("query", d.Query), zap.Any("dropName", d.TableNameChange.DropName)) - } - for i, schemaAndTable := range d.TableNameChange.DropName { - events = append(events, &DDLEvent{ - Version: d.Version, - Type: d.Type, - PrevSchemaName: schemaAndTable.SchemaName, - PrevTableName: schemaAndTable.TableName, - Query: queries[i], - FinishedTs: d.FinishedTs, - }) - } - return events default: } return []*DDLEvent{d} @@ -247,7 +228,7 @@ func (e *DDLEvent) GetDDLType() model.ActionType { } func (t DDLEvent) Marshal() ([]byte, error) { - // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | errorData | errorDataSize + // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipletableInfosDataSize |errorData | errorDataSize data, err := json.Marshal(t) if err != nil { return nil, err @@ -273,6 +254,20 @@ func (t DDLEvent) Marshal() ([]byte, error) { data = append(data, tableInfoDataSize...) } + for _, info := range t.MultipleTableInfos { + tableInfoData, err := info.Marshal() + if err != nil { + return nil, err + } + tableInfoDataSize := make([]byte, 8) + binary.BigEndian.PutUint64(tableInfoDataSize, uint64(len(tableInfoData))) + data = append(data, tableInfoData...) + data = append(data, tableInfoDataSize...) + } + multipletableInfosDataSize := make([]byte, 8) + binary.BigEndian.PutUint64(multipletableInfosDataSize, uint64(len(t.MultipleTableInfos))) + data = append(data, multipletableInfosDataSize...) + if t.Err != nil { errData := []byte(t.Err.Error()) errDataSize := make([]byte, 8) @@ -288,7 +283,7 @@ func (t DDLEvent) Marshal() ([]byte, error) { } func (t *DDLEvent) Unmarshal(data []byte) error { - // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | errorData | errorDataSize + // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipletableInfosDataSize | errorData | errorDataSize t.eventSize = int64(len(data)) errorDataSize := binary.BigEndian.Uint64(data[len(data)-8:]) if errorDataSize > 0 { @@ -297,6 +292,18 @@ func (t *DDLEvent) Unmarshal(data []byte) error { t.Err = apperror.ErrDDLEventError.FastGen(string(errorData)) } end := len(data) - 8 - int(errorDataSize) + multipletableInfosDataSize := binary.BigEndian.Uint64(data[end-8 : end]) + for i := 0; i < int(multipletableInfosDataSize); i++ { + tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end]) + tableInfoData := data[end-8-int(tableInfoDataSize) : end-8] + info, err := common.UnmarshalJSONToTableInfo(tableInfoData) + if err != nil { + return err + } + t.MultipleTableInfos = append(t.MultipleTableInfos, info) + end -= 8 + int(tableInfoDataSize) + } + end -= 8 + int(multipletableInfosDataSize) tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end]) var err error if tableInfoDataSize > 0 {