Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis optimized #127

Merged
merged 7 commits into from
Dec 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"

"github.com/dtm-labs/dtm/bench/svr"
"github.com/dtm-labs/dtm/common"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/logger"
Expand All @@ -23,17 +24,18 @@ func main() {
fmt.Printf(hint)
return
}
logger.Debugf("starting dtm....")
logger.Infof("starting dtm....")
if os.Args[1] == "http" {
fmt.Println("start bench server")
common.MustLoadConfig()
logger.InitLog(common.Config.LogLevel)
dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver)
registry.WaitStoreUp()
dtmsvr.PopulateDB(true)
examples.PopulateDB(true)
dtmsvr.PopulateDB(false)
examples.PopulateDB(false)
dtmsvr.StartSvr() // 启动dtmsvr的api服务
go dtmsvr.CronExpiredTrans(-1) // 启动dtmsvr的定时过期查询
StartSvr()
svr.StartSvr()
select {}
} else {
fmt.Printf(hint)
Expand Down
2 changes: 2 additions & 0 deletions bench/run-dtm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && ab -t $TIM
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier" && curl "http://127.0.0.1:8083/api/busi_bench/bench"

ab -n 1000000 -c 10 "http://127.0.0.1:8083/api/busi_bench/benchEmptyUrl"
4 changes: 3 additions & 1 deletion bench/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ chmod +x /usr/local/bin/docker-compose

# install go
wget https://golang.org/dl/go1.17.1.linux-amd64.tar.gz
rm -rf /usr/local/go && tar -C /usr/local -xzf go1.17.1.linux-amd64.tar.gz && cp -f /usr/local/bin/go /usr/local/go/bin/go
rm -rf /usr/local/go && tar -C /usr/local -xzf go1.17.1.linux-amd64.tar.gz && cp -f /usr/local/go/bin/go /usr/local/bin/go

apt install -y redis
14 changes: 12 additions & 2 deletions bench/http.go → bench/svr/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* license that can be found in the LICENSE file.
*/

package main
package svr

import (
"database/sql"
Expand All @@ -20,6 +20,7 @@ import (
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/examples"
"github.com/gin-gonic/gin"
"github.com/lithammer/shortuuid"
)

// launch command:go run app/main.go qs
Expand All @@ -32,7 +33,7 @@ const total = 200000
var benchBusi = fmt.Sprintf("http://localhost:%d%s", benchPort, benchAPI)

func sdbGet() *sql.DB {
db, err := dtmimp.PooledDB(common.Config.Store.GetDBConf())
db, err := dtmimp.PooledDB(common.Config.ExamplesDB)
logger.FatalIfError(err)
return db
}
Expand Down Expand Up @@ -167,4 +168,13 @@ func benchAddRoute(app *gin.Engine) {
}
return nil, nil
}))
app.Any(benchAPI+"/benchEmptyUrl", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
gid := shortuuid.New()
req := gin.H{}
saga := dtmcli.NewSaga(examples.DtmHttpServer, gid).
Add("", "", req).
Add("", "", req)
err := saga.Submit()
return nil, err
}))
}
60 changes: 30 additions & 30 deletions dtmsvr/storage/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (a *argList) AppendGid(gid string) *argList {
a.Keys = append(a.Keys, config.Store.RedisPrefix+"_g_"+gid)
a.Keys = append(a.Keys, config.Store.RedisPrefix+"_b_"+gid)
a.Keys = append(a.Keys, config.Store.RedisPrefix+"_u")
a.Keys = append(a.Keys, config.Store.RedisPrefix+"_s_"+gid)
return a
}

Expand Down Expand Up @@ -141,19 +142,21 @@ func (s *RedisStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches
AppendGid(global.Gid).
AppendObject(global).
AppendRaw(global.NextCronTime.Unix()).
AppendRaw(global.Gid).
AppendRaw(global.Status).
AppendBranches(branches)
global.Steps = nil
global.Payloads = nil
_, err := callLua(a, `-- MaySaveNewTrans
local gs = cjson.decode(ARGV[3])
local g = redis.call('GET', KEYS[1])
if g ~= false then
return 'UNIQUE_CONFLICT'
end

redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
redis.call('ZADD', KEYS[3], ARGV[4], gs.gid)
for k = 5, table.getn(ARGV) do
redis.call('SET', KEYS[4], ARGV[6], 'EX', ARGV[2])
redis.call('ZADD', KEYS[3], ARGV[4], ARGV[5])
for k = 7, table.getn(ARGV) do
redis.call('RPUSH', KEYS[2], ARGV[k])
end
redis.call('EXPIRE', KEYS[2], ARGV[2])
Expand All @@ -164,17 +167,12 @@ redis.call('EXPIRE', KEYS[2], ARGV[2])
func (s *RedisStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
args := newArgList().
AppendGid(gid).
AppendObject(&storage.TransGlobalStore{Gid: gid, Status: status}).
AppendRaw(status).
AppendRaw(branchStart).
AppendBranches(branches)
_, err := callLua(args, `
local gs = cjson.decode(ARGV[3])
local g = redis.call('GET', KEYS[1])
if (g == false) then
return 'NOT_FOUND'
end
local js = cjson.decode(g)
if js.status ~= gs.status then
_, err := callLua(args, `-- LockGlobalSaveBranches
local old = redis.call('GET', KEYS[4])
if old ~= ARGV[3] then
return 'NOT_FOUND'
end
local start = ARGV[4]
Expand All @@ -193,20 +191,22 @@ redis.call('EXPIRE', KEYS[2], ARGV[2])
func (s *RedisStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status
global.Status = newStatus
args := newArgList().AppendGid(global.Gid).AppendObject(global).AppendRaw(old).AppendRaw(finished)
args := newArgList().
AppendGid(global.Gid).
AppendObject(global).
AppendRaw(old).
AppendRaw(finished).
AppendRaw(global.Gid).
AppendRaw(newStatus)
_, err := callLua(args, `-- ChangeGlobalStatus
local gs = cjson.decode(ARGV[3])
local old = redis.call('GET', KEYS[1])
if old == false then
return 'NOT_FOUND'
end
local os = cjson.decode(old)
if os.status ~= ARGV[4] then
local old = redis.call('GET', KEYS[4])
if old ~= ARGV[4] then
return 'NOT_FOUND'
end
redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
redis.call('SET', KEYS[4], ARGV[7], 'EX', ARGV[2])
if ARGV[5] == '1' then
redis.call('ZREM', KEYS[3], gs.gid)
redis.call('ZREM', KEYS[3], ARGV[6])
end
`)
dtmimp.E2P(err)
Expand Down Expand Up @@ -246,18 +246,18 @@ func (s *RedisStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInt
global.NextCronTime = common.GetNextTime(nextCronInterval)
global.UpdateTime = common.GetNextTime(0)
global.NextCronInterval = nextCronInterval
args := newArgList().AppendGid(global.Gid).AppendObject(global).AppendRaw(global.NextCronTime.Unix())
args := newArgList().
AppendGid(global.Gid).
AppendObject(global).
AppendRaw(global.NextCronTime.Unix()).
AppendRaw(global.Status).
AppendRaw(global.Gid)
_, err := callLua(args, `-- TouchCronTime
local g = cjson.decode(ARGV[3])
local old = redis.call('GET', KEYS[1])
if old == false then
local old = redis.call('GET', KEYS[4])
if old ~= ARGV[5] then
return 'NOT_FOUND'
end
local os = cjson.decode(old)
if os.status ~= g.status then
return 'NOT_FOUND'
end
redis.call('ZADD', KEYS[3], ARGV[4], g.gid)
redis.call('ZADD', KEYS[3], ARGV[4], ARGV[6])
redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
`)
dtmimp.E2P(err)
Expand Down
4 changes: 2 additions & 2 deletions dtmsvr/storage/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"math"
"time"

"github.com/google/uuid"
"github.com/lithammer/shortuuid/v3"
"gorm.io/gorm"
"gorm.io/gorm/clause"

Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlob
}
expire := int(expireIn / time.Second)
whereTime := fmt.Sprintf("next_cron_time < %s", getTime(expire))
owner := uuid.NewString()
owner := shortuuid.New()
global := &storage.TransGlobalStore{}
dbr := db.Must().Model(global).
Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").
Expand Down
5 changes: 2 additions & 3 deletions dtmsvr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ import (
"fmt"
"time"

"github.com/google/uuid"

"github.com/dtm-labs/dtm/common"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmsvr/storage/registry"
"github.com/lithammer/shortuuid/v3"
)

type branchStatus struct {
Expand All @@ -38,7 +37,7 @@ var TransProcessedTestChan chan string = nil

// GenGid generate gid, use uuid
func GenGid() string {
return uuid.NewString()
return shortuuid.New()
}

// GetTransGlobal construct trans from db
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ require (
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/lib/pq v1.10.3
github.com/lithammer/shortuuid v2.0.3+incompatible
github.com/lithammer/shortuuid/v3 v3.0.7
github.com/onsi/gomega v1.16.0
github.com/polarismesh/grpc-go-polaris v0.0.0-20211128162137-1a59cd7b5733 // indirect
github.com/prometheus/client_golang v1.11.0
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
Expand Down Expand Up @@ -369,6 +370,11 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg=
github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lithammer/shortuuid v1.0.0 h1:kdcbvjGVEgqeVeDIRtnANOi/F6ftbKrtbxY+cjQmK1Q=
github.com/lithammer/shortuuid v2.0.3+incompatible h1:ao1r3cQ9AUX+c6dZXwbCM/ELGf10EoO4SyqqxBXTyHc=
github.com/lithammer/shortuuid v2.0.3+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand Down Expand Up @@ -467,6 +473,7 @@ github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OK
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
Expand Down
2 changes: 2 additions & 0 deletions test/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/dtm-labs/dtm/common"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/examples"
"github.com/gin-gonic/gin"
Expand All @@ -26,6 +27,7 @@ func exitIf(code int) {

func TestMain(m *testing.M) {
common.MustLoadConfig()
logger.InitLog(config.LogLevel)
dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver)
dtmsvr.TransProcessedTestChan = make(chan string, 1)
dtmsvr.NowForwardDuration = 0 * time.Second
Expand Down