Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: charlie <[email protected]>
  • Loading branch information
Charlie17Li committed Feb 14, 2023
1 parent 5fcadbe commit 7afed96
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 129 deletions.
10 changes: 2 additions & 8 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,12 @@ func InitPath(configFilePath string) {
initRmClient(cfg)
initTmClient(cfg)
initDatasource(cfg)
initExecutor(cfg)
}

var (
onceInitTmClient sync.Once
onceInitRmClient sync.Once
onceInitDatasource sync.Once
onceExecutor sync.Once
)

// InitTmClient init client tm client
Expand All @@ -77,9 +75,11 @@ func initRmClient(cfg *Config) {
log.Init()
initRemoting(cfg)
rm.InitRm(rm.RmConfig{
Config: cfg.ClientConfig.RmConfig,
ApplicationID: cfg.ApplicationID,
TxServiceGroup: cfg.TxServiceGroup,
})
config.Init(cfg.ClientConfig.RmConfig.LockConfig)
client.RegisterProcessor()
integration.Init()
tcc.InitTCC()
Expand All @@ -92,9 +92,3 @@ func initDatasource(cfg *Config) {
datasource.Init()
})
}

func initExecutor(cfg *Config) {
onceExecutor.Do(func() {
config.Init(cfg.ExecutorConfig)
})
}
4 changes: 0 additions & 4 deletions pkg/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package client
import (
"flag"
"fmt"

"io/ioutil"
"os"
"path/filepath"
Expand All @@ -34,7 +33,6 @@ import (
"github.com/knadh/koanf/providers/rawbytes"

"github.com/seata/seata-go/pkg/datasource/sql"
"github.com/seata/seata-go/pkg/datasource/sql/exec/config"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm"
Expand Down Expand Up @@ -82,7 +80,6 @@ type Config struct {
GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"`
TransportConfig getty.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"`
ServiceConfig tm.ServiceConfig `yaml:"service" json:"service" koanf:"service"`
ExecutorConfig config.Config `yaml:"executor" json:"executor" koanf:"executor"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -100,7 +97,6 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.GettyConfig.RegisterFlagsWithPrefix("getty", f)
c.TransportConfig.RegisterFlagsWithPrefix("transport", f)
c.ServiceConfig.RegisterFlagsWithPrefix("service", f)
c.ExecutorConfig.RegisterFlagsWithPrefix("executor", f)
}

type loaderConf struct {
Expand Down
20 changes: 5 additions & 15 deletions pkg/client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func TestLoadPath(t *testing.T) {
assert.Equal(t, false, cfg.ClientConfig.RmConfig.SagaRetryPersistModeUpdate)
assert.Equal(t, -2147482648, cfg.ClientConfig.RmConfig.TccActionInterceptorOrder)
assert.Equal(t, "druid", cfg.ClientConfig.RmConfig.SqlParserType)
assert.Equal(t, 10, cfg.ClientConfig.RmConfig.LockConfig.RetryInterval)
assert.Equal(t, time.Second*30, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes)
assert.Equal(t, 30*time.Second, cfg.ClientConfig.RmConfig.LockConfig.RetryInterval)
assert.Equal(t, 10, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes)
assert.Equal(t, true, cfg.ClientConfig.RmConfig.LockConfig.RetryPolicyBranchRollbackOnConflict)

assert.NotNil(t, cfg.ClientConfig.UndoConfig)
Expand Down Expand Up @@ -112,17 +112,12 @@ func TestLoadPath(t *testing.T) {
assert.Equal(t, "default", cfg.ServiceConfig.VgroupMapping["default_tx_group"])
assert.Equal(t, "127.0.0.1:8091", cfg.ServiceConfig.Grouplist["default"])

assert.NotNil(t, cfg.ExecutorConfig)
assert.NotNil(t, cfg.ExecutorConfig.ATExecutor)
assert.Equal(t, 3, cfg.ExecutorConfig.ATExecutor.SelectForUpdate.RetryTimes)
assert.Equal(t, 20*time.Millisecond, cfg.ExecutorConfig.ATExecutor.SelectForUpdate.RetryInterval)

// reset flag.CommandLine
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
}

func TestLoadJson(t *testing.T) {
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":10,"retry-times":"30s","retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}, "executor":{"at":{"select-for-update":{"retry-times":3, "retry-interval":"20ms"}}}}`
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":"30s","retry-times":10,"retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}`
cfg := LoadJson([]byte(confJson))
assert.NotNil(t, cfg)
assert.Equal(t, false, cfg.Enabled)
Expand All @@ -143,8 +138,8 @@ func TestLoadJson(t *testing.T) {
assert.Equal(t, false, cfg.ClientConfig.RmConfig.SagaRetryPersistModeUpdate)
assert.Equal(t, -2147482648, cfg.ClientConfig.RmConfig.TccActionInterceptorOrder)
assert.Equal(t, "druid", cfg.ClientConfig.RmConfig.SqlParserType)
assert.Equal(t, 10, cfg.ClientConfig.RmConfig.LockConfig.RetryInterval)
assert.Equal(t, time.Second*30, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes)
assert.Equal(t, 30*time.Second, cfg.ClientConfig.RmConfig.LockConfig.RetryInterval)
assert.Equal(t, 10, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes)
assert.Equal(t, true, cfg.ClientConfig.RmConfig.LockConfig.RetryPolicyBranchRollbackOnConflict)

assert.NotNil(t, cfg.ClientConfig.UndoConfig)
Expand Down Expand Up @@ -208,11 +203,6 @@ func TestLoadJson(t *testing.T) {
assert.Equal(t, "default_test", cfg.ServiceConfig.VgroupMapping["default_tx_group"])
assert.Equal(t, "127.0.0.1:8092", cfg.ServiceConfig.Grouplist["default"])

assert.NotNil(t, cfg.ExecutorConfig)
assert.NotNil(t, cfg.ExecutorConfig.ATExecutor)
assert.Equal(t, 3, cfg.ExecutorConfig.ATExecutor.SelectForUpdate.RetryTimes)
assert.Equal(t, 20*time.Millisecond, cfg.ExecutorConfig.ATExecutor.SelectForUpdate.RetryInterval)

// reset flag.CommandLine
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
}
12 changes: 2 additions & 10 deletions pkg/datasource/sql/exec/at/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@

package at

import "flag"
import "github.com/seata/seata-go/pkg/rm"

var ATConfig Config

type Config struct {
SelectForUpdate SelectForUpdateExecutorConfig `yaml:"select-for-update" json:"select-for-update" koanf:"select-for-update"`
}

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
c.SelectForUpdate.RegisterFlagsWithPrefix(prefix+".select-for-update", f)
}
var LockConfig rm.LockConfig
3 changes: 1 addition & 2 deletions pkg/datasource/sql/exec/at/delete_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,11 @@ func (d *deleteExecutor) buildBeforeImageSQL(query string, args []driver.NamedVa

// afterImage build after image
func (d *deleteExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) {

tableName, _ := d.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, d.execContext.DBName, tableName)

if err != nil {
return nil, err
}

return types.NewEmptyRecordImage(metaData, types.SQLTypeDelete), nil
}
4 changes: 2 additions & 2 deletions pkg/datasource/sql/exec/at/insert_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"

"github.com/arana-db/parser/ast"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/types"
Expand Down Expand Up @@ -82,7 +83,6 @@ func (i *insertExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNam

// beforeImage build before image
func (i *insertExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) {

tableName, _ := i.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
if err != nil {
Expand Down Expand Up @@ -457,10 +457,10 @@ func (i *insertExecutor) getPkValuesByAuto(ctx context.Context, execCtx *types.E

tableName, _ := i.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)

if err != nil {
return nil, err
}

pkValuesMap := make(map[string][]interface{})
pkMetaMap := metaData.GetPrimaryKeyMap()
if len(pkMetaMap) == 0 {
Expand Down
Loading

0 comments on commit 7afed96

Please sign in to comment.