Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix undo log sql type bug #495

Merged
merged 1 commit into from
Mar 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/datasource/sql/conn_at.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func (c *ATConn) createNewTxOnExecIfNeed(ctx context.Context, f func() (types.Ex
}
defer func() {
recoverErr := recover()
if err != nil || recoverErr != nil {
log.Errorf("conn at rollback error:%v or recoverErr:%v", err, recoverErr)
if recoverErr != nil {
log.Errorf("at exec panic, recoverErr:%v", recoverErr)
if tx != nil {
rollbackErr := tx.Rollback()
if rollbackErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/sql/exec/at/at_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (e *ATExecutor) ExecWithNamedValue(ctx context.Context, execCtx *types.Exec
executor = NewDeleteExecutor(queryParser, execCtx, e.hooks)
case types.SQLTypeSelectForUpdate:
executor = NewSelectForUpdateExecutor(queryParser, execCtx, e.hooks)
case types.SQLTypeInsertOnUpdate:
case types.SQLTypeInsertOnDuplicateUpdate:
executor = NewInsertOnUpdateExecutor(queryParser, execCtx, e.hooks)
case types.SQLTypeMulti:
executor = NewMultiExecutor(queryParser, execCtx, e.hooks)
Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/sql/exec/at/base_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (b *baseExecutor) traversalArgs(node ast.Node, argsIndex *[]int32) {
}
}

func (b *baseExecutor) buildRecordImages(rowsi driver.Rows, tableMetaData *types.TableMeta) (*types.RecordImage, error) {
func (b *baseExecutor) buildRecordImages(rowsi driver.Rows, tableMetaData *types.TableMeta, sqlType types.SQLType) (*types.RecordImage, error) {
// select column names
columnNames := rowsi.Columns()
rowImages := make([]types.RowImage, 0)
Expand Down Expand Up @@ -183,7 +183,7 @@ func (b *baseExecutor) buildRecordImages(rowsi driver.Rows, tableMetaData *types
rowImages = append(rowImages, types.RowImage{Columns: columns})
}

return &types.RecordImage{TableName: tableMetaData.TableName, Rows: rowImages}, nil
return &types.RecordImage{TableName: tableMetaData.TableName, Rows: rowImages, SQLType: sqlType}, nil
}

func getSqlNullValue(value interface{}) interface{} {
Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/sql/exec/at/delete_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (d *deleteExecutor) beforeImage(ctx context.Context) (*types.RecordImage, e
return nil, err
}

image, err := d.buildRecordImages(rowsi, metaData)
image, err := d.buildRecordImages(rowsi, metaData, types.SQLTypeDelete)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/sql/exec/at/insert_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (i *insertExecutor) afterImage(ctx context.Context) (*types.RecordImage, er
return nil, fmt.Errorf("invalid conn")
}

image, err := i.buildRecordImages(rowsi, metaData)
image, err := i.buildRecordImages(rowsi, metaData, types.SQLTypeInsert)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/sql/exec/at/insert_on_update_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (i *insertOnUpdateExecutor) beforeImage(ctx context.Context) (*types.Record
log.Errorf("ctx driver query: %+v", err)
return nil, err
}
image, err := i.buildRecordImages(rowsi, metaData)
image, err := i.buildRecordImages(rowsi, metaData, types.SQLTypeInsertOnDuplicateUpdate)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func (i *insertOnUpdateExecutor) afterImage(ctx context.Context, beforeImages *t
if err != nil {
return nil, err
}
afterImage, err := i.buildRecordImages(rowsi, metaData)
afterImage, err := i.buildRecordImages(rowsi, metaData, types.SQLTypeInsertOnDuplicateUpdate)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/sql/exec/at/multi_delete_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (m *multiDeleteExecutor) beforeImage(ctx context.Context) ([]*types.RecordI
if err != nil {
return nil, err
}
image, err = m.buildRecordImages(rowsi, metaData)
image, err = m.buildRecordImages(rowsi, metaData, types.SQLTypeDelete)
if err != nil {
log.Errorf("record images : %+v", err)
return nil, err
Expand Down Expand Up @@ -150,7 +150,7 @@ func (m *multiDeleteExecutor) buildBeforeImageSQL() ([]string, []driver.NamedVal
if err != nil {
return nil, nil, err
}
for _, p := range ps.MultiStmt {
for _, p = range ps.MultiStmt {
tableName = p.DeleteStmt.TableRefs.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O
v, ok := tables[tableName]
if ok && v.clear {
Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/sql/exec/at/multi_update_excutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (u *multiUpdateExecutor) beforeImage(ctx context.Context) ([]*types.RecordI
return nil, err
}

image, err := u.buildRecordImages(rows, metaData)
image, err := u.buildRecordImages(rows, metaData, types.SQLTypeUpdate)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -166,7 +166,7 @@ func (u *multiUpdateExecutor) afterImage(ctx context.Context, beforeImages []*ty
return nil, err
}

image, err := u.buildRecordImages(rows, metaData)
image, err := u.buildRecordImages(rows, metaData, types.SQLTypeUpdate)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/sql/exec/at/update_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (u *updateExecutor) beforeImage(ctx context.Context) (*types.RecordImage, e
return nil, fmt.Errorf("invalid conn")
}

image, err := u.buildRecordImages(rowsi, metaData)
image, err := u.buildRecordImages(rowsi, metaData, types.SQLTypeUpdate)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func (u *updateExecutor) afterImage(ctx context.Context, beforeImage types.Recor
return nil, fmt.Errorf("invalid conn")
}

afterImage, err := u.buildRecordImages(rowsi, metaData)
afterImage, err := u.buildRecordImages(rowsi, metaData, types.SQLTypeUpdate)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/sql/parser/parser_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func parseParseContext(stmtNode ast.StmtNode) *types.ParseContext {
parserCtx.ExecutorType = types.ReplaceIntoExecutor
}
if len(stmt.OnDuplicate) != 0 {
parserCtx.SQLType = types.SQLTypeInsertOnUpdate
parserCtx.SQLType = types.SQLTypeInsertOnDuplicateUpdate
parserCtx.ExecutorType = types.InsertOnDuplicateExecutor
}
case *ast.UpdateStmt:
Expand Down
31 changes: 24 additions & 7 deletions pkg/datasource/sql/types/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ package types
//go:generate stringer -type=SQLType
type SQLType int32

// reference:https://github.com/seata/seata/blob/2.x/sqlparser/seata-sqlparser-core/src/main/java/io/seata/sqlparser/SQLType.java
const (
_ SQLType = iota
SQLTypeUnknown
SQLTypeSelect
SQLTypeSelect = iota
SQLTypeInsert
SQLTypeUpdate
SQLTypeDelete
SQLTypeSelectForUpdate
SQLTypeInsertOnUpdate
SQLTypeReplace
SQLTypeTruncate
SQLTypeCreate
Expand All @@ -41,19 +39,38 @@ const (
SQLTypeDump
SQLTypeDebug
SQLTypeExplain
SQLTypeProcedure
SQLTypeDesc
SQLLastInsertID
SQLSelectWithoutTable
SQLCreateSequence
SQLShowSequence
SQLGetSequence
SQLAlterSequence
SQLDropSequence
SQLTddlShow
SQLTypeSet
SQLTypeReload
SQLTypeSelectUnion
SQLTypeCreateTable
SQLTypeDropTable
SQLTypeAlterTable
SQLTypeSavePoint
SQLTypeSelectFromUpdate
SQLTypeMultiDelete
SQLTypeMultiUpdate
SQLTypeCreateIndex
SQLTypeDropIndex
SQLTypeMulti
SQLTypeKill
SQLTypeLockTables
SQLTypeUnLockTables
SQLTypeCheckTable
SQLTypeSelectFoundRows
SQLTypeInsertIgnore = iota + 57
SQLTypeInsertOnDuplicateUpdate
// SQLTypeMulti and SQLTypeUnknown is different from seata-java
SQLTypeMulti = iota + 999
SQLTypeUnknown
)

func (s SQLType) MarshalText() (text []byte, err error) {
Expand All @@ -68,7 +85,7 @@ func (s SQLType) MarshalText() (text []byte, err error) {
return []byte("DELETE"), nil
case SQLTypeSelectForUpdate:
return []byte("SELECT_FOR_UPDATE"), nil
case SQLTypeInsertOnUpdate:
case SQLTypeInsertOnDuplicateUpdate:
return []byte("INSERT_ON_UPDATE"), nil
case SQLTypeReplace:
return []byte("REPLACE"), nil
Expand Down Expand Up @@ -137,7 +154,7 @@ func (s *SQLType) UnmarshalText(b []byte) error {
case "SELECT_FOR_UPDATE":
*s = SQLTypeSelectForUpdate
case "INSERT_ON_UPDATE":
*s = SQLTypeInsertOnUpdate
*s = SQLTypeInsertOnDuplicateUpdate
case "REPLACE":
*s = SQLTypeReplace
case "TRUNCATE":
Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/sql/undo/base/undo.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ func (m *BaseUndoLogManager) insertUndoLogWithGlobalFinished(ctx context.Context
parseContext[compressorTypeKey] = "NONE"
undoLogContent := m.encodeUndoLogCtx(parseContext)

logParse,err:= parser.GetCache().Load(parseContext[serializerKey])
if err!=nil{
logParse, err := parser.GetCache().Load(parseContext[serializerKey])
if err != nil {
return err
}

Expand Down