Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix:Fix the issues of empty rollback infinite retries and inability to resolve hanging bugs in TCC-Fence mode. #684

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
bugfix:修复tcc-fence模式空回滚无限重试以及无法解决悬挂bug
yangwenbinch committed Sep 2, 2024
commit 790121921dafd5f97a57b79729f7a444f0c2fd0c
15 changes: 8 additions & 7 deletions pkg/rm/tcc/fence/fence_api.go
Original file line number Diff line number Diff line change
@@ -29,10 +29,11 @@ import (

// WithFence Execute the fence database operation first and then call back the business method
func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) {
if err = DoFence(ctx, tx); err != nil {
if skip, err := DoFence(ctx, tx); err != nil {
return err
} else if skip {
return nil
}

if err := callback(); err != nil {
return fmt.Errorf("the business method error msg of: %p, [%w]", callback, err)
}
@@ -47,20 +48,20 @@ func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err erro
// case 3: if fencePhase is FencePhaseCommit, will do commit fence operation.
// case 4: if fencePhase is FencePhaseRollback, will do rollback fence operation.
// case 5: if fencePhase not in above case, will return a fence phase illegal error.
func DoFence(ctx context.Context, tx *sql.Tx) error {
func DoFence(ctx context.Context, tx *sql.Tx) (bool, error) {
hd := handler.GetFenceHandler()
phase := tm.GetFencePhase(ctx)

switch phase {
case enum.FencePhaseNotExist:
return fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx))
return false, fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx))
case enum.FencePhasePrepare:
return hd.PrepareFence(ctx, tx)
return false, hd.PrepareFence(ctx, tx)
case enum.FencePhaseCommit:
return hd.CommitFence(ctx, tx)
return false, hd.CommitFence(ctx, tx)
case enum.FencePhaseRollback:
return hd.RollbackFence(ctx, tx)
}

return fmt.Errorf("fence phase: %v illegal", phase)
return false, fmt.Errorf("fence phase: %v illegal", phase)
}
14 changes: 7 additions & 7 deletions pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go
Original file line number Diff line number Diff line change
@@ -110,37 +110,37 @@ func (handler *tccFenceWrapperHandler) CommitFence(ctx context.Context, tx *sql.
return handler.updateFenceStatus(tx, xid, branchId, enum.StatusCommitted)
}

func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx) error {
func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx) (bool, error) {
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId
actionName := tm.GetBusinessActionContext(ctx).ActionName
fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId)
if err != nil {
return fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
return false, fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
}

// record is null, mean the need suspend
if fenceDo == nil {
err = handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusSuspended)
if err != nil {
return fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
return false, fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
}
log.Infof("Insert tcc fence suspend record xid: %s, branchId: %d", xid, branchId)
return nil
return true, nil
}

// have rollbacked or suspended
if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended {
// enable warn level
log.Infof("Branch transaction had already rollbacked before, idempotency rejected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status)
return nil
return true, nil
}
if fenceDo.Status == enum.StatusCommitted {
log.Warnf("Branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
return false, fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
}

return handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked)
return false, handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked)
}

func (handler *tccFenceWrapperHandler) insertTCCFenceLog(tx *sql.Tx, xid string, branchId int64, actionName string, status enum.FenceStatus) error {
2 changes: 1 addition & 1 deletion pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ func (t *TccFenceStoreDatabaseMapper) QueryTCCFenceDO(tx *sql.Tx, xid string, br
if err = result.Scan(&xid, &branchId, &actionName, &status, &gmtCreate, &gmtModify); err != nil {
// will return error, if rows is empty
if err.Error() == "sql: no rows in result set" {
return nil, fmt.Errorf("query tcc fence get scan row,no rows in result set, [%w]", err)
return nil, nil
} else {
return nil, fmt.Errorf("query tcc fence get scan row failed, [%w]", err)
}