Skip to content

Commit

Permalink
Merge pull request #60 from youzan/optimize-latency
Browse files Browse the repository at this point in the history
Optimize cluster syncer and write latency
  • Loading branch information
absolute8511 authored Mar 5, 2020
2 parents 3e931d0 + 22c1a78 commit d2db9b9
Show file tree
Hide file tree
Showing 34 changed files with 858 additions and 250 deletions.
6 changes: 5 additions & 1 deletion node/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ func (nd *KVNode) setnxCommand(cmd redcon.Command) (interface{}, error) {
err := fmt.Errorf("ERR wrong number arguments for '%v' command", string(cmd.Args[0]))
return nil, err
}
ex, _ := nd.store.KVExists(cmd.Args[1])
key, err := common.CutNamesapce(cmd.Args[1])
if err != nil {
return nil, err
}
ex, _ := nd.store.KVExists(key)
if ex == 1 {
// already exist
return int64(0), nil
Expand Down
200 changes: 134 additions & 66 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var enableSnapTransferTest = false
var enableSnapSaveTest = false
var enableSnapApplyTest = false
var enableSnapApplyRestoreStorageTest = false
var UseRedisV2 = false

func EnableSnapForTest(transfer bool, save bool, apply bool, restore bool) {
enableSnapTransferTest = transfer
Expand All @@ -51,6 +52,7 @@ const (
RedisReq int8 = 0
CustomReq int8 = 1
SchemaChangeReq int8 = 2
RedisV2Req int8 = 3
proposeTimeout = time.Second * 4
proposeQueueLen = 800
raftSlow = time.Millisecond * 200
Expand Down Expand Up @@ -108,6 +110,7 @@ type waitReqHeaders struct {
wr wait.WaitResult
done chan struct{}
reqs BatchInternalRaftRequest
buf *bytes.Buffer
pool *sync.Pool
}

Expand All @@ -116,6 +119,9 @@ func (wrh *waitReqHeaders) release() {
wrh.wr = nil
if wrh.pool != nil {
wrh.reqs.Reqs = wrh.reqs.Reqs[:0]
if wrh.buf != nil {
wrh.buf.Reset()
}
wrh.pool.Put(wrh)
}
}
Expand All @@ -133,6 +139,7 @@ func newWaitReqPoolArray() waitReqPoolArray {
obj := &waitReqHeaders{}
obj.reqs.Reqs = make([]InternalRaftRequest, 0, 1)
obj.done = make(chan struct{}, 1)
obj.buf = &bytes.Buffer{}
obj.pool = waitReqPool
return obj
}
Expand All @@ -146,6 +153,7 @@ func (wa waitReqPoolArray) getWaitReq(idLen int) *waitReqHeaders {
obj := &waitReqHeaders{}
obj.reqs.Reqs = make([]InternalRaftRequest, 0, idLen)
obj.done = make(chan struct{}, 1)
obj.buf = &bytes.Buffer{}
return obj
}
index := 0
Expand Down Expand Up @@ -492,23 +500,37 @@ func (nd *KVNode) GetMergeHandler(cmd string) (common.MergeCommandFunc, bool, bo

func (nd *KVNode) ProposeInternal(ctx context.Context, irr InternalRaftRequest, cancel context.CancelFunc, start time.Time) (*waitReqHeaders, error) {
wrh := nd.wrPools.getWaitReq(1)
wrh.reqs.Reqs = append(wrh.reqs.Reqs, irr)
wrh.reqs.ReqNum = 1
wrh.reqs.Timestamp = irr.Header.Timestamp
if len(wrh.done) != 0 {
wrh.done = make(chan struct{}, 1)
}
poolCost := time.Since(start)
buffer, err := wrh.reqs.Marshal()
marshalCost := time.Since(start)
// buffer will be reused by raft?
// TODO:buffer, err := reqList.MarshalTo()
if err != nil {
wrh.release()
return nil, err
var e raftpb.Entry
if irr.Header.DataType == int32(RedisV2Req) {
e.DataType = irr.Header.DataType
e.Timestamp = irr.Header.Timestamp
e.ID = irr.Header.ID
e.Data = irr.Data
} else {
wrh.reqs.Reqs = append(wrh.reqs.Reqs, irr)
wrh.reqs.ReqNum = 1
needSize := wrh.reqs.Size()
wrh.buf.Grow(needSize)
b := wrh.buf.Bytes()
//buffer, err := wrh.reqs.Marshal()
// buffer will be reused by raft
n, err := wrh.reqs.MarshalTo(b[:needSize])
if err != nil {
wrh.release()
return nil, err
}
rbuf := make([]byte, n)
copy(rbuf, b[:n])
e.Data = rbuf
}
marshalCost := time.Since(start)
wrh.wr = nd.w.RegisterWithC(irr.Header.ID, wrh.done)
err = nd.rn.node.ProposeWithDrop(ctx, buffer, cancel)
err := nd.rn.node.ProposeEntryWithDrop(ctx, e, cancel)
if err != nil {
nd.rn.Infof("propose failed : %v", err.Error())
nd.w.Trigger(irr.Header.ID, err)
Expand All @@ -518,7 +540,7 @@ func (nd *KVNode) ProposeInternal(ctx context.Context, irr InternalRaftRequest,
proposalCost := time.Since(start)
if proposalCost >= raftSlow/2 {
nd.rn.Infof("raft slow for propose buf: %v, cost %v-%v-%v",
len(buffer), poolCost, marshalCost, proposalCost)
len(e.Data), poolCost, marshalCost, proposalCost)
}
return wrh, nil
}
Expand All @@ -539,86 +561,97 @@ func (nd *KVNode) IsWriteReady() bool {
return atomic.LoadInt32(&nd.rn.memberCnt) > int32(rep/2)
}

func (nd *KVNode) ProposeRawAndWait(buffer []byte, term uint64, index uint64, raftTs int64) error {
var reqList BatchInternalRaftRequest
err := reqList.Unmarshal(buffer)
if err != nil {
nd.rn.Infof("propose raw failed: %v at (%v-%v)", err.Error(), term, index)
return err
}
if nodeLog.Level() >= common.LOG_DETAIL {
nd.rn.Infof("propose raw (%v): %v at (%v-%v)", len(buffer), buffer, term, index)
}
func (nd *KVNode) ProposeRawAsyncFromSyncer(buffer []byte, reqList *BatchInternalRaftRequest, term uint64, index uint64, raftTs int64) (*FutureRsp, *BatchInternalRaftRequest, error) {
reqList.Type = FromClusterSyncer
reqList.ReqId = nd.rn.reqIDGen.Next()
reqList.OrigTerm = term
reqList.OrigIndex = index
if reqList.Timestamp != raftTs {
return fmt.Errorf("invalid sync raft request for mismatch timestamp: %v vs %v", reqList.Timestamp, raftTs)
return nil, reqList, fmt.Errorf("invalid sync raft request for mismatch timestamp: %v vs %v", reqList.Timestamp, raftTs)
}

for _, req := range reqList.Reqs {
// re-generate the req id to override the id from log
req.Header.ID = nd.rn.reqIDGen.Next()
}
dataLen := reqList.Size()
var err error
if dataLen <= len(buffer) {
n, err := reqList.MarshalTo(buffer[:dataLen])
if err != nil {
return err
return nil, reqList, err
}
if n != dataLen {
return errors.New("marshal length mismatch")
return nil, reqList, errors.New("marshal length mismatch")
}
} else {
buffer, err = reqList.Marshal()
if err != nil {
return err
return nil, reqList, err
}
}
start := time.Now()

// must register before propose
wr := nd.w.Register(reqList.ReqId)
ctx, cancel := context.WithTimeout(context.Background(), proposeTimeout)
if nodeLog.Level() >= common.LOG_DETAIL {
nd.rn.Infof("propose raw after rewrite(%v): %v at (%v-%v)", dataLen, buffer[:dataLen], term, index)
}
defer cancel()
err = nd.rn.node.ProposeWithDrop(ctx, buffer[:dataLen], cancel)
if err != nil {
cancel()
nd.w.Trigger(reqList.ReqId, err)
return err
return nil, reqList, err
}
var ok bool
var rsp interface{}
select {
case <-wr.WaitC():

var futureRsp FutureRsp
futureRsp.waitFunc = func() (interface{}, error) {
var rsp interface{}
var ok bool
var err error
// will always return a response, timed out or get a error
select {
case <-ctx.Done():
err = ctx.Err()
if err == context.Canceled {
// proposal canceled can be caused by leader transfer or no leader
err = ErrProposalCanceled
}
nd.w.Trigger(reqList.ReqId, err)
<-wr.WaitC()
case <-wr.WaitC():
}
rsp = wr.GetResult()
cancel()
if err, ok = rsp.(error); ok {
rsp = nil
//nd.rn.Infof("request return error: %v, %v", req.String(), err.Error())
} else {
err = nil
}
// we can avoid wait on stop since ctx will be returned (avoid concurrency performance degrade)
//case <-nd.stopChan:
// err = common.ErrStopped
case <-ctx.Done():
err = ctx.Err()
if err == context.DeadlineExceeded {
nd.rn.Infof("propose timeout: %v", err.Error())
}
if err == context.Canceled {
// proposal canceled can be caused by leader transfer or no leader
err = ErrProposalCanceled
nd.rn.Infof("propose canceled ")
}
nd.w.Trigger(reqList.ReqId, err)
return rsp, err
}
return &futureRsp, reqList, nil
}

func (nd *KVNode) ProposeRawAndWaitFromSyncer(reqList *BatchInternalRaftRequest, term uint64, index uint64, raftTs int64) error {
f, _, err := nd.ProposeRawAsyncFromSyncer(nil, reqList, term, index, raftTs)
if err != nil {
return err
}
start := time.Now()
rsp, err := f.WaitRsp()
if err != nil {
return err
}
var ok bool
if err, ok = rsp.(error); ok {
return err
}

cost := time.Since(start).Nanoseconds()
for _, req := range reqList.Reqs {
if req.Header.DataType == int32(RedisReq) {
nd.clusterWriteStats.UpdateWriteStats(int64(len(req.Data)), cost/1000)
if req.Header.DataType == int32(RedisReq) || req.Header.DataType == int32(RedisV2Req) {
nd.UpdateWriteStats(int64(len(req.Data)), cost/1000)
}
}
if cost >= int64(proposeTimeout.Nanoseconds())/2 {
Expand All @@ -627,6 +660,10 @@ func (nd *KVNode) ProposeRawAndWait(buffer []byte, term uint64, index uint64, ra
return err
}

func (nd *KVNode) UpdateWriteStats(vSize int64, latencyUs int64) {
nd.clusterWriteStats.UpdateWriteStats(vSize, latencyUs)
}

type FutureRsp struct {
waitFunc func() (interface{}, error)
rspHandle func(interface{}) (interface{}, error)
Expand Down Expand Up @@ -689,11 +726,25 @@ func (nd *KVNode) queueRequest(start time.Time, req InternalRaftRequest) (*Futur
return &futureRsp, nil
}

func (nd *KVNode) ProposeAsync(buf []byte) (*FutureRsp, error) {
func (nd *KVNode) RedisV2ProposeAsync(buf []byte) (*FutureRsp, error) {
h := RequestHeader{
ID: nd.rn.reqIDGen.Next(),
DataType: int32(RedisV2Req),
}
raftReq := InternalRaftRequest{
Header: h,
Data: buf,
}
start := time.Now()
return nd.queueRequest(start, raftReq)
}

func (nd *KVNode) RedisProposeAsync(buf []byte) (*FutureRsp, error) {
h := RequestHeader{
ID: nd.rn.reqIDGen.Next(),
DataType: int32(RedisReq),
}

raftReq := InternalRaftRequest{
Header: h,
Data: buf,
Expand All @@ -702,7 +753,7 @@ func (nd *KVNode) ProposeAsync(buf []byte) (*FutureRsp, error) {
return nd.queueRequest(start, raftReq)
}

func (nd *KVNode) Propose(buf []byte) (interface{}, error) {
func (nd *KVNode) RedisPropose(buf []byte) (interface{}, error) {
h := RequestHeader{
ID: nd.rn.reqIDGen.Next(),
DataType: int32(RedisReq),
Expand All @@ -718,7 +769,7 @@ func (nd *KVNode) Propose(buf []byte) (interface{}, error) {
}
rsp, err := fr.WaitRsp()
cost := time.Since(start)
nd.clusterWriteStats.UpdateWriteStats(int64(len(raftReq.Data)), cost.Nanoseconds()/1000)
nd.UpdateWriteStats(int64(len(raftReq.Data)), cost.Nanoseconds()/1000)
if err == nil && !nd.IsWriteReady() {
nd.rn.Infof("write request %v on raft success but raft member is less than replicator",
raftReq.String())
Expand Down Expand Up @@ -1077,22 +1128,32 @@ func (nd *KVNode) applyConfChangeEntry(evnt raftpb.Entry, confState *raftpb.Conf

func (nd *KVNode) applyEntry(evnt raftpb.Entry, isReplaying bool, batch IBatchOperator) bool {
forceBackup := false
var reqList BatchInternalRaftRequest
isRemoteSnapTransfer := false
isRemoteSnapApply := false
if evnt.Data != nil {
// try redis command
var reqList BatchInternalRaftRequest
parseErr := reqList.Unmarshal(evnt.Data)
if parseErr != nil {
nd.rn.Infof("parse request failed: %v, data len %v, entry: %v, raw:%v",
parseErr, len(evnt.Data), evnt,
evnt.String())
if evnt.DataType == int32(RedisV2Req) {
var r InternalRaftRequest
r.Header.ID = evnt.ID
r.Header.Timestamp = evnt.Timestamp
r.Header.DataType = evnt.DataType
r.Data = evnt.Data
reqList.ReqNum = 1
reqList.Reqs = append(reqList.Reqs, r)
reqList.Timestamp = evnt.Timestamp
} else {
parseErr := reqList.Unmarshal(evnt.Data)
if parseErr != nil {
nd.rn.Errorf("parse request failed: %v, data len %v, entry: %v, raw:%v",
parseErr, len(evnt.Data), evnt,
evnt.String())
}
}
if len(reqList.Reqs) != int(reqList.ReqNum) {
nd.rn.Infof("request check failed %v, real len:%v",
reqList, len(reqList.Reqs))
}

isRemoteSnapTransfer := false
isRemoteSnapApply := false
if reqList.Type == FromClusterSyncer {
isApplied := nd.isAlreadyApplied(reqList)
// check if retrying duplicate req, we can just ignore old retry
Expand All @@ -1110,12 +1171,19 @@ func (nd *KVNode) applyEntry(evnt raftpb.Entry, isReplaying bool, batch IBatchOp
return false
}
isRemoteSnapTransfer, isRemoteSnapApply = nd.preprocessRemoteSnapApply(reqList)
if !isRemoteSnapApply && !isRemoteSnapTransfer {
// check if the commit index is continue on remote
if !nd.isContinueCommit(reqList) {
nd.rn.Errorf("request %v-%v is not continue while syncing from remote", reqList.OrigTerm, reqList.OrigIndex)
}
}
}
var retErr error
forceBackup, retErr = nd.sm.ApplyRaftRequest(isReplaying, batch, reqList, evnt.Term, evnt.Index, nd.stopChan)
if reqList.Type == FromClusterSyncer {
nd.postprocessRemoteSnapApply(reqList, isRemoteSnapTransfer, isRemoteSnapApply, retErr)
}
}
// if event.Data is nil, maybe some other event like the leader transfer
var retErr error
forceBackup, retErr = nd.sm.ApplyRaftRequest(isReplaying, batch, reqList, evnt.Term, evnt.Index, nd.stopChan)
if reqList.Type == FromClusterSyncer {
nd.postprocessRemoteApply(reqList, isRemoteSnapTransfer, isRemoteSnapApply, retErr)
}
return forceBackup
}
Expand Down
3 changes: 2 additions & 1 deletion node/node_cmd_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,9 @@ func (nd *KVNode) registerHandler() {
nd.router.RegisterRead("scard", wrapReadCommandK(nd.scardCommand))
nd.router.RegisterRead("sismember", wrapReadCommandKSubkey(nd.sismemberCommand))
nd.router.RegisterRead("smembers", wrapReadCommandK(nd.smembersCommand))
nd.router.RegisterRead("srandmember", wrapReadCommandKAnySubkey(nd.srandmembersCommand))
nd.router.RegisterWrite("spop", nd.spopCommand)
nd.router.RegisterWrite("sadd", wrapWriteCommandKSubkeySubkey(nd, checkAndRewriteIntRsp))
nd.router.RegisterWrite("sadd", nd.saddCommand)
nd.router.RegisterWrite("srem", wrapWriteCommandKSubkeySubkey(nd, checkAndRewriteIntRsp))
nd.router.RegisterWrite("sclear", wrapWriteCommandK(nd, checkAndRewriteIntRsp))
// for ttl
Expand Down
Loading

0 comments on commit d2db9b9

Please sign in to comment.