From 16ad9fd937ed1c498aac67423fa3c1c88b15c79a Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Mon, 1 Nov 2021 15:54:55 +0800 Subject: [PATCH 1/4] etcd_worker: add rate limiter to limit etcd worker ticks frequency --- pkg/orchestrator/etcd_worker.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index b5cfc4a6c42..8cad63deb3c 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/time/rate" ) // EtcdWorker handles all interactions with Etcd @@ -120,7 +121,8 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, sessionDone = make(chan struct{}) } lastReceivedEventTime := time.Now() - + // limit the frequency of EtcdWorker ticks + rl := rate.NewLimiter(10, 30) for { var response clientv3.WatchResponse select { @@ -137,7 +139,6 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, } case response = <-watchCh: // In this select case, we receive new events from Etcd, and call handleEvent if appropriate. - if err := response.Err(); err != nil { return errors.Trace(err) } @@ -184,6 +185,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if err := worker.applyUpdates(); err != nil { return errors.Trace(err) } + // if !rl.Allow(), skip this Tick to avoid etcd worker tick too frequency + if !rl.Allow() { + continue + } nextState, err := worker.reactor.Tick(ctx, worker.state) if err != nil { if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) { From 799378bd7e53255473fcd61403243d9eb33f6a82 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Mon, 1 Nov 2021 18:53:27 +0800 Subject: [PATCH 2/4] etcd_worker_test: fix test fail --- pkg/orchestrator/etcd_worker.go | 1 + pkg/orchestrator/etcd_worker_test.go | 21 ++++++++++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 8cad63deb3c..709faa22023 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -189,6 +189,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if !rl.Allow() { continue } + // it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick nextState, err := worker.reactor.Tick(ctx, worker.state) if err != nil { if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) { diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index dc78d0fce62..72cced6004f 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -281,6 +281,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) { type intReactorState struct { val int isUpdated bool + lastVal int } func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { @@ -289,6 +290,12 @@ func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) er if err != nil { log.Panic("intReactorState", zap.Error(err)) } + // As long as we can ensure that val is monotonically increasing, + // we can ensure that the linearizability of state changes + if s.lastVal > s.val { + log.Panic("linearizability check failed, lastVal must less than current val", zap.Int("lastVal", s.lastVal), zap.Int("val", s.val)) + } + s.lastVal = s.val s.isUpdated = !isInit return nil } @@ -298,17 +305,17 @@ func (s *intReactorState) GetPatches() [][]DataPatch { } type linearizabilityReactor struct { - state *intReactorState - expected int + state *intReactorState + tickCount int } func (r *linearizabilityReactor) Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) { r.state = state.(*intReactorState) if r.state.isUpdated { - if r.state.val != r.expected { - log.Panic("linearizability check failed", zap.Int("expected", r.expected), zap.Int("actual", r.state.val)) + if r.state.val < r.tickCount { + log.Panic("linearizability check failed, val must larger than tickCount", zap.Int("expected", r.tickCount), zap.Int("actual", r.state.val)) } - r.expected++ + r.tickCount++ } if r.state.val == 1999 { return r.state, cerrors.ErrReactorFinished @@ -334,8 +341,8 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) { } reactor, err := NewEtcdWorker(cli0, testEtcdKeyPrefix+"/lin", &linearizabilityReactor{ - state: nil, - expected: 999, + state: nil, + tickCount: 999, }, &intReactorState{ val: 0, isUpdated: false, From f1f969b95fa9159623743e9e70256b9b9024a9c7 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 3 Nov 2021 15:27:26 +0800 Subject: [PATCH 3/4] etcd_worker: use OwnerFlushInterval or ProcessorFlushInterval to calculate tick rate --- pkg/orchestrator/etcd_worker.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 709faa22023..d82b02af6dd 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -121,8 +121,11 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, sessionDone = make(chan struct{}) } lastReceivedEventTime := time.Now() - // limit the frequency of EtcdWorker ticks - rl := rate.NewLimiter(10, 30) + + // tickRate represents the number of times EtcdWorker can tick + // the reactor per second + tickRate := time.Second / timerInterval + rl := rate.NewLimiter(rate.Limit(tickRate), int(tickRate)*3) for { var response clientv3.WatchResponse select { @@ -185,7 +188,9 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if err := worker.applyUpdates(); err != nil { return errors.Trace(err) } - // if !rl.Allow(), skip this Tick to avoid etcd worker tick too frequency + + // if !rl.Allow(), skip this Tick to avoid etcd worker tick reactor too frequency + // It make etcdWorker to batch etcd changed event in worker.state if !rl.Allow() { continue } From af8cbd9cb7daf493bc9f5c969cc376ccb359c073 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 3 Nov 2021 16:50:19 +0800 Subject: [PATCH 4/4] etcd_worker: set rate limit burst to 1 --- pkg/orchestrator/etcd_worker.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index d82b02af6dd..102fe84db11 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -125,7 +125,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, // tickRate represents the number of times EtcdWorker can tick // the reactor per second tickRate := time.Second / timerInterval - rl := rate.NewLimiter(rate.Limit(tickRate), int(tickRate)*3) + rl := rate.NewLimiter(rate.Limit(tickRate), 1) for { var response clientv3.WatchResponse select { @@ -189,8 +189,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, return errors.Trace(err) } - // if !rl.Allow(), skip this Tick to avoid etcd worker tick reactor too frequency - // It make etcdWorker to batch etcd changed event in worker.state + // If !rl.Allow(), skip this Tick to avoid etcd worker tick reactor too frequency. + // It make etcdWorker to batch etcd changed event in worker.state. + // The semantics of `ReactorState` requires that any implementation + // can batch updates internally. if !rl.Allow() { continue }