Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: support instance BusinessActionContext outside the TCC try … #179

Merged
merged 14 commits into from
Aug 13, 2022
3 changes: 2 additions & 1 deletion goimports.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
#

go get -v golang.org/x/tools/cmd/goimports
goimports -w .
goimports -w .
go mod tidy
13 changes: 10 additions & 3 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
package common

const (
StartTime = "action-start-time"
HostName = "host-name"
ActionContext = "actionContext"
ActionStartTime = "action-start-time"
HostName = "host-name"
ActionContext = "actionContext"

PrepareMethod = "sys::prepare"
CommitMethod = "sys::commit"
RollbackMethod = "sys::rollback"
ActionName = "actionName"

SeataXidKey = "SEATA_XID"
XidKey = "TX_XID"
Expand All @@ -30,4 +35,6 @@ const (
BranchTypeKey = "TX_BRANCH_TYPE"
GlobalLockKey = "TX_LOCK"
SeataFilterKey = "seataDubboFilter"

TccBusinessActionContextParameter = "tccParam"
)
161 changes: 137 additions & 24 deletions pkg/rm/tcc/tcc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ package tcc
import (
"context"
"encoding/json"
"fmt"
"reflect"
"sync"
"time"

"github.com/seata/seata-go/pkg/common/types"

"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/common/net"
"github.com/seata/seata-go/pkg/common/types"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/tm"
Expand Down Expand Up @@ -74,52 +73,166 @@ func (t *TCCServiceProxy) Reference() string {
return types.GetReference(t.TCCResource.TwoPhaseAction.GetTwoPhaseService())
}

func (t *TCCServiceProxy) Prepare(ctx context.Context, param ...interface{}) (interface{}, error) {
func (t *TCCServiceProxy) Prepare(ctx context.Context, params interface{}) (interface{}, error) {
if tm.IsTransactionOpened(ctx) {
err := t.registeBranch(ctx)
err := t.registeBranch(ctx, params)
if err != nil {
return nil, err
}
}
return t.TCCResource.Prepare(ctx, param)
return t.TCCResource.Prepare(ctx, params)
}

func (t *TCCServiceProxy) registeBranch(ctx context.Context) error {
// register transaction branch
// registeBranch send register branch transaction request
func (t *TCCServiceProxy) registeBranch(ctx context.Context, params interface{}) error {
if !tm.IsTransactionOpened(ctx) {
err := errors.New("BranchRegister error, transaction should be opened")
log.Errorf(err.Error())
return err
}
// todo add param
tccContext := make(map[string]interface{}, 0)
tccContext[common.StartTime] = time.Now().UnixNano() / 1e6
tccContext[common.HostName] = net.GetLocalIp()
tccContextStr, _ := json.Marshal(map[string]interface{}{
common.ActionContext: tccContext,

tccContext := t.initBusinessActionContext(ctx, params)
actionContext := t.initActionContext(params)
for k, v := range actionContext {
tccContext.ActionContext[k] = v
}

applicationData, _ := json.Marshal(map[string]interface{}{
common.ActionContext: actionContext,
})
branchId, err := rm.GetRMRemotingInstance().BranchRegister(rm.BranchRegisterParam{
BranchType: branch.BranchTypeTCC,
ResourceId: t.GetActionName(),
ClientId: "",
Xid: tm.GetXID(ctx),
ApplicationData: string(tccContextStr),
ApplicationData: string(applicationData),
LockKeys: "",
})
if err != nil {
err = errors.New(fmt.Sprintf("BranchRegister error: %v", err.Error()))
log.Errorf(err.Error())
log.Errorf("register branch transaction error %s ", err.Error())
return err
}
tccContext.BranchId = branchId
tm.SetBusinessActionContext(ctx, tccContext)
return nil
}

actionContext := &tm.BusinessActionContext{
Xid: tm.GetXID(ctx),
BranchId: branchId,
ActionName: t.GetActionName(),
//ActionContext: param,
// initActionContext init action context
func (t *TCCServiceProxy) initActionContext(params interface{}) map[string]interface{} {
actionContext := t.getActionContextParameters(params)
actionContext[common.ActionStartTime] = time.Now().UnixNano() / 1e6
actionContext[common.PrepareMethod] = t.TCCResource.TwoPhaseAction.GetPrepareMethodName()
actionContext[common.CommitMethod] = t.TCCResource.TwoPhaseAction.GetCommitMethodName()
actionContext[common.RollbackMethod] = t.TCCResource.TwoPhaseAction.GetRollbackMethodName()
actionContext[common.ActionName] = t.TCCResource.TwoPhaseAction.GetActionName()
actionContext[common.HostName] = net.GetLocalIp()
return actionContext
}

func (t *TCCServiceProxy) getActionContextParameters(params interface{}) map[string]interface{} {
var (
actionContext = make(map[string]interface{}, 0)
typ reflect.Type
val reflect.Value
isStruct bool
)
if params == nil {
return actionContext
}
if isStruct, val, typ = obtainStructValueType(params); !isStruct {
return actionContext
}
for i := 0; i < typ.NumField(); i++ {
// skip unexported anonymous filed
if typ.Field(i).PkgPath != "" {
continue
}
structField := typ.Field(i)
// skip ignored field
tagVal, hasTag := structField.Tag.Lookup(common.TccBusinessActionContextParameter)
if !hasTag || tagVal == `-` || tagVal == "" {
continue
}
actionContext[tagVal] = val.Field(i).Interface()
}
return actionContext
}

// initBusinessActionContext init tcc context
func (t *TCCServiceProxy) initBusinessActionContext(ctx context.Context, params interface{}) *tm.BusinessActionContext {
tccContext := t.getOrCreateBusinessActionContext(params)
tccContext.Xid = tm.GetXID(ctx)
tccContext.ActionName = t.GetActionName()
// todo read from config file
tccContext.IsDelayReport = true
if tccContext.ActionContext == nil {
tccContext.ActionContext = make(map[string]interface{}, 0)
}
return tccContext
}

// getOrCreateBusinessActionContext When the parameters of the prepare method are the following scenarios, obtain the context in the following ways:
// 1. null: create new BusinessActionContext
// 2. tm.BusinessActionContext: return it
// 3. *tm.BusinessActionContext: if nil then create new BusinessActionContext, else return it
// 4. Struct: if there is an attribute of businessactioncontext type and it is not nil, return it
// 5. else: create new BusinessActionContext
func (t *TCCServiceProxy) getOrCreateBusinessActionContext(params interface{}) *tm.BusinessActionContext {
if params == nil {
return &tm.BusinessActionContext{}
}

switch params.(type) {
case tm.BusinessActionContext:
v := params.(tm.BusinessActionContext)
return &v
case *tm.BusinessActionContext:
v := params.(*tm.BusinessActionContext)
if v != nil {
return v
}
return &tm.BusinessActionContext{}
default:
break
}

var (
typ reflect.Type
val reflect.Value
isStruct bool
)
if isStruct, val, typ = obtainStructValueType(params); !isStruct {
return &tm.BusinessActionContext{}
}
n := typ.NumField()
for i := 0; i < n; i++ {
sf := typ.Field(i)
if sf.Type == rm.TypBusinessContextInterface {
v := val.Field(i).Interface()
if v != nil {
return v.(*tm.BusinessActionContext)
}
}
if sf.Type == reflect.TypeOf(tm.BusinessActionContext{}) && val.Field(i).CanInterface() {
v := val.Field(i).Interface().(tm.BusinessActionContext)
return &v
}
}
return &tm.BusinessActionContext{}
}

// obtainStructValueType check o is struct or pointer type
func obtainStructValueType(o interface{}) (bool, reflect.Value, reflect.Type) {
v := reflect.ValueOf(o)
t := reflect.TypeOf(o)
switch v.Kind() {
case reflect.Struct:
return true, v, t
case reflect.Ptr:
return true, v.Elem(), t.Elem()
default:
return false, v, nil
}
tm.SetBusinessActionContext(ctx, actionContext)
return nil
}

func (t *TCCServiceProxy) GetTransactionInfo() tm.TransactionInfo {
Expand Down
Loading