Skip to content

Commit

Permalink
redis cluster ok
Browse files Browse the repository at this point in the history
  • Loading branch information
yedf2 committed Dec 22, 2021
1 parent 91022e2 commit 18c5f53
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 51 deletions.
2 changes: 1 addition & 1 deletion common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Store struct {
MaxIdleConns int64 `yaml:"MaxIdleConns" default:"500"`
ConnMaxLifeTime int64 `yaml:"ConnMaxLifeTime" default:"5"`
DataExpire int64 `yaml:"DataExpire" default:"604800"` // Trans data will expire in 7 days. only for redis/boltdb.
RedisPrefix string `yaml:"RedisPrefix" default:"{}"` // Redis storage prefix. store data to only one slot in cluster
RedisPrefix string `yaml:"RedisPrefix" default:"{a}"` // Redis storage prefix. store data to only one slot in cluster
}

func (s *Store) IsDB() bool {
Expand Down
3 changes: 3 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ import (
"sync"

"github.com/go-redis/redis/v8"
"github.com/yedf/dtm/dtmcli/dtmimp"
)

var rdb *redis.Client
var once sync.Once

func RedisGet() *redis.Client {
once.Do(func() {
dtmimp.Logf("connecting to redis: %v", Config.Store)
rdb = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", Config.Store.Host, Config.Store.Port),
Username: Config.Store.User,
Password: Config.Store.Password,
})
})
Expand Down
98 changes: 48 additions & 50 deletions dtmsvr/storage/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (s *RedisStore) UpdateBranchesSql(branches []TransBranchStore, updates []st
}

type argList struct {
Keys []string
List []interface{}
}

Expand All @@ -85,6 +86,13 @@ func newArgList() *argList {
return a.AppendRaw(config.Store.RedisPrefix).AppendObject(config.Store.DataExpire)
}

func (a *argList) AppendGid(gid string) *argList {
a.Keys = append(a.Keys, config.Store.RedisPrefix+"_g_"+gid)
a.Keys = append(a.Keys, config.Store.RedisPrefix+"_b_"+gid)
a.Keys = append(a.Keys, config.Store.RedisPrefix+"_u")
return a
}

func (a *argList) AppendRaw(v interface{}) *argList {
a.List = append(a.List, v)
return a
Expand Down Expand Up @@ -114,47 +122,46 @@ func handleRedisResult(ret interface{}, err error) (string, error) {
return s, err
}

func callLua(args []interface{}, lua string) (string, error) {
dtmimp.Logf("calling lua. args: %v\nlua:%s", args, lua)
ret, err := redisGet().Eval(ctx, lua, []string{config.Store.RedisPrefix}, args...).Result()
func callLua(a *argList, lua string) (string, error) {
dtmimp.Logf("calling lua. args: %v\nlua:%s", a, lua)
ret, err := redisGet().Eval(ctx, lua, a.Keys, a.List...).Result()
return handleRedisResult(ret, err)
}

func (s *RedisStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error {
args := newArgList().
a := newArgList().
AppendGid(global.Gid).
AppendObject(global).
AppendRaw(global.NextCronTime.Unix()).
AppendBranches(branches).
List
AppendBranches(branches)
global.Steps = nil
global.Payloads = nil
_, err := callLua(args, `-- MaySaveNewTrans
_, err := callLua(a, `-- MaySaveNewTrans
local gs = cjson.decode(ARGV[3])
local g = redis.call('GET', ARGV[1] .. '_g_' .. gs.gid)
local g = redis.call('GET', KEYS[1])
if g ~= false then
return 'UNIQUE_CONFLICT'
end
redis.call('SET', ARGV[1] .. '_g_' .. gs.gid, ARGV[3], 'EX', ARGV[2])
redis.call('ZADD', ARGV[1] .. '_u', ARGV[4], gs.gid)
redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
redis.call('ZADD', KEYS[3], ARGV[4], gs.gid)
for k = 5, table.getn(ARGV) do
redis.call('RPUSH', ARGV[1] .. '_b_' .. gs.gid, ARGV[k])
redis.call('RPUSH', KEYS[2], ARGV[k])
end
redis.call('EXPIRE', ARGV[1] .. '_b_' .. gs.gid, ARGV[2])
redis.call('EXPIRE', KEYS[2], ARGV[2])
`)
return err
}

func (s *RedisStore) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) {
args := newArgList().
AppendGid(gid).
AppendObject(&TransGlobalStore{Gid: gid, Status: status}).
AppendRaw(branchStart).
AppendBranches(branches).
List
AppendBranches(branches)
_, err := callLua(args, `
local pre = ARGV[1]
local gs = cjson.decode(ARGV[3])
local g = redis.call('GET', pre .. '_g_' .. gs.gid)
local g = redis.call('GET', KEYS[1])
if (g == false) then
return 'NOT_FOUND'
end
Expand All @@ -165,35 +172,34 @@ end
local start = ARGV[4]
for k = 5, table.getn(ARGV) do
if start == "-1" then
redis.call('RPUSH', pre .. '_b_' .. gs.gid, ARGV[k])
redis.call('RPUSH', KEYS[2], ARGV[k])
else
redis.call('LSET', pre .. '_b_' .. gs.gid, start+k-5, ARGV[k])
redis.call('LSET', KEYS[2], start+k-5, ARGV[k])
end
end
redis.call('EXPIRE', pre .. '_b_' .. gs.gid, ARGV[2])
redis.call('EXPIRE', KEYS[2], ARGV[2])
`)
dtmimp.E2P(err)
}

func (s *RedisStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status
global.Status = newStatus
args := newArgList().AppendObject(global).AppendRaw(old).AppendRaw(finished).List
args := newArgList().AppendGid(global.Gid).AppendObject(global).AppendRaw(old).AppendRaw(finished)
_, err := callLua(args, `-- ChangeGlobalStatus
local p = ARGV[1]
local gs = cjson.decode(ARGV[3])
local old = redis.call('GET', p .. '_g_' .. gs.gid)
local old = redis.call('GET', KEYS[1])
if old == false then
return 'NOT_FOUND'
end
local os = cjson.decode(old)
if os.status ~= ARGV[4] then
return 'NOT_FOUND'
end
redis.call('SET', p .. '_g_' .. gs.gid, ARGV[3], 'EX', ARGV[2])
redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
redis.log(redis.LOG_WARNING, 'finished: ', ARGV[5])
if ARGV[5] == '1' then
redis.call('ZREM', p .. '_u', gs.gid)
redis.call('ZREM', KEYS[3], gs.gid)
end
`)
dtmimp.E2P(err)
Expand All @@ -202,58 +208,50 @@ end
func (s *RedisStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore {
expired := time.Now().Add(expireIn).Unix()
next := time.Now().Add(time.Duration(config.RetryInterval) * time.Second).Unix()
args := newArgList().AppendRaw(expired).AppendRaw(next).List
args := newArgList().AppendGid("").AppendRaw(expired).AppendRaw(next)
lua := `-- LocakOneGlobalTrans
local k = ARGV[1] .. '_u'
local r = redis.call('ZRANGE', k, 0, 0, 'WITHSCORES')
local r = redis.call('ZRANGE', KEYS[3], 0, 0, 'WITHSCORES')
local gid = r[1]
if gid == nil then
return 'NOT_FOUND'
end
local g = redis.call('GET', ARGV[1] .. '_g_' .. gid)
redis.log(redis.LOG_WARNING, 'g is: ', g, 'gid is: ', gid)
if g == false then
redis.call('ZREM', k, gid)
return 'NOT_FOUND'
end
if tonumber(r[2]) > tonumber(ARGV[3]) then
return 'NOT_FOUND'
end
redis.call('ZADD', k, ARGV[4], gid)
return g
redis.call('ZADD', KEYS[3], ARGV[4], gid)
return gid
`
r, err := callLua(args, lua)
for err == ErrShouldRetry {
r, err = callLua(args, lua)
}
if err == ErrNotFound {
return nil
for {
r, err := callLua(args, lua)
if err == ErrNotFound {
return nil
}
dtmimp.E2P(err)
global := s.FindTransGlobalStore(r)
if global != nil {
return global
}
}
dtmimp.E2P(err)
global := &TransGlobalStore{}
dtmimp.MustUnmarshalString(r, global)
return global
}

func (s *RedisStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int64) {
global.NextCronTime = common.GetNextTime(nextCronInterval)
global.UpdateTime = common.GetNextTime(0)
global.NextCronInterval = nextCronInterval
args := newArgList().AppendObject(global).AppendRaw(global.NextCronTime.Unix()).List
args := newArgList().AppendGid(global.Gid).AppendObject(global).AppendRaw(global.NextCronTime.Unix())
_, err := callLua(args, `-- TouchCronTime
local p = ARGV[1]
local g = cjson.decode(ARGV[3])
local old = redis.call('GET', p .. '_g_' .. g.gid)
local old = redis.call('GET', KEYS[1])
if old == false then
return 'NOT_FOUND'
end
local os = cjson.decode(old)
if os.status ~= g.status then
return 'NOT_FOUND'
end
redis.call('ZADD', p .. '_u', ARGV[4], g.gid)
redis.call('SET', p .. '_g_' .. g.gid, ARGV[3], 'EX', ARGV[2])
redis.call('ZADD', KEYS[3], ARGV[4], g.gid)
redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
`)
dtmimp.E2P(err)
}
2 changes: 2 additions & 0 deletions test/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func TestMain(m *testing.M) {
} else {
config.Store.Driver = "redis"
config.Store.Host = "localhost"
config.Store.User = ""
config.Store.Password = ""
config.Store.Port = 6379
}
dtmsvr.PopulateDB(false)
Expand Down

0 comments on commit 18c5f53

Please sign in to comment.