From 790121921dafd5f97a57b79729f7a444f0c2fd0c Mon Sep 17 00:00:00 2001 From: yang <1362154550@qq.com> Date: Tue, 3 Sep 2024 00:49:46 +0800 Subject: [PATCH 1/2] =?UTF-8?q?bugfix:=E4=BF=AE=E5=A4=8Dtcc-fence=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E7=A9=BA=E5=9B=9E=E6=BB=9A=E6=97=A0=E9=99=90=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E4=BB=A5=E5=8F=8A=E6=97=A0=E6=B3=95=E8=A7=A3=E5=86=B3?= =?UTF-8?q?=E6=82=AC=E6=8C=82bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/rm/tcc/fence/fence_api.go | 15 ++++++++------- .../fence/handler/tcc_fence_wrapper_handler.go | 14 +++++++------- pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go | 2 +- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/rm/tcc/fence/fence_api.go b/pkg/rm/tcc/fence/fence_api.go index 501a3733c..6e7967fe8 100644 --- a/pkg/rm/tcc/fence/fence_api.go +++ b/pkg/rm/tcc/fence/fence_api.go @@ -29,10 +29,11 @@ import ( // WithFence Execute the fence database operation first and then call back the business method func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) { - if err = DoFence(ctx, tx); err != nil { + if skip, err := DoFence(ctx, tx); err != nil { return err + } else if skip { + return nil } - if err := callback(); err != nil { return fmt.Errorf("the business method error msg of: %p, [%w]", callback, err) } @@ -47,20 +48,20 @@ func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err erro // case 3: if fencePhase is FencePhaseCommit, will do commit fence operation. // case 4: if fencePhase is FencePhaseRollback, will do rollback fence operation. // case 5: if fencePhase not in above case, will return a fence phase illegal error. -func DoFence(ctx context.Context, tx *sql.Tx) error { +func DoFence(ctx context.Context, tx *sql.Tx) (bool, error) { hd := handler.GetFenceHandler() phase := tm.GetFencePhase(ctx) switch phase { case enum.FencePhaseNotExist: - return fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx)) + return false, fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx)) case enum.FencePhasePrepare: - return hd.PrepareFence(ctx, tx) + return false, hd.PrepareFence(ctx, tx) case enum.FencePhaseCommit: - return hd.CommitFence(ctx, tx) + return false, hd.CommitFence(ctx, tx) case enum.FencePhaseRollback: return hd.RollbackFence(ctx, tx) } - return fmt.Errorf("fence phase: %v illegal", phase) + return false, fmt.Errorf("fence phase: %v illegal", phase) } diff --git a/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go index d719f01e1..909e70295 100644 --- a/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go +++ b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go @@ -110,37 +110,37 @@ func (handler *tccFenceWrapperHandler) CommitFence(ctx context.Context, tx *sql. return handler.updateFenceStatus(tx, xid, branchId, enum.StatusCommitted) } -func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx) error { +func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx) (bool, error) { xid := tm.GetBusinessActionContext(ctx).Xid branchId := tm.GetBusinessActionContext(ctx).BranchId actionName := tm.GetBusinessActionContext(ctx).ActionName fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId) if err != nil { - return fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) + return false, fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) } // record is null, mean the need suspend if fenceDo == nil { err = handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusSuspended) if err != nil { - return fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) + return false, fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) } log.Infof("Insert tcc fence suspend record xid: %s, branchId: %d", xid, branchId) - return nil + return true, nil } // have rollbacked or suspended if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended { // enable warn level log.Infof("Branch transaction had already rollbacked before, idempotency rejected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status) - return nil + return true, nil } if fenceDo.Status == enum.StatusCommitted { log.Warnf("Branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) - return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) + return false, fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) } - return handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked) + return false, handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked) } func (handler *tccFenceWrapperHandler) insertTCCFenceLog(tx *sql.Tx, xid string, branchId int64, actionName string, status enum.FenceStatus) error { diff --git a/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go index 5da7ad7a6..7bf75cd5a 100644 --- a/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go +++ b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go @@ -75,7 +75,7 @@ func (t *TccFenceStoreDatabaseMapper) QueryTCCFenceDO(tx *sql.Tx, xid string, br if err = result.Scan(&xid, &branchId, &actionName, &status, &gmtCreate, &gmtModify); err != nil { // will return error, if rows is empty if err.Error() == "sql: no rows in result set" { - return nil, fmt.Errorf("query tcc fence get scan row,no rows in result set, [%w]", err) + return nil, nil } else { return nil, fmt.Errorf("query tcc fence get scan row failed, [%w]", err) } From b31926c210dec306b92964231403d960ce6dc62c Mon Sep 17 00:00:00 2001 From: yang <1362154550@qq.com> Date: Tue, 3 Sep 2024 01:26:02 +0800 Subject: [PATCH 2/2] =?UTF-8?q?bugfix:=E4=BF=AE=E5=A4=8Dtcc-fence=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E7=A9=BA=E5=9B=9E=E6=BB=9A=E6=97=A0=E9=99=90=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E4=BB=A5=E5=8F=8A=E6=97=A0=E6=B3=95=E8=A7=A3=E5=86=B3?= =?UTF-8?q?=E6=82=AC=E6=8C=82bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go index 909e70295..96280a195 100644 --- a/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go +++ b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go @@ -126,14 +126,14 @@ func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sq return false, fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) } log.Infof("Insert tcc fence suspend record xid: %s, branchId: %d", xid, branchId) - return true, nil + return true, tx.Commit() } // have rollbacked or suspended if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended { // enable warn level log.Infof("Branch transaction had already rollbacked before, idempotency rejected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status) - return true, nil + return true, tx.Commit() } if fenceDo.Status == enum.StatusCommitted { log.Warnf("Branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)