Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write batching strategy #156

Open
FZambia opened this issue Dec 7, 2022 · 34 comments
Open

Write batching strategy #156

FZambia opened this issue Dec 7, 2022 · 34 comments
Labels
enhancement New feature or request

Comments

@FZambia
Copy link
Contributor

FZambia commented Dec 7, 2022

Hey @rueian, this is me again.

I was preparing Rueidis-based code for release and suddenly discovered an interesting thing. I did quite a lot of Go benchmarks to make sure the new implementation based on Rueidis produces a better operation latency and a better throughput. And it does.

I also expected that migration to Rueidis will provide Centrifugo a better CPU utilization since Rueidis produces less memory allocations. And here are dragons.

Before making release I decided to do macro-benchmarks and found that Centrifugo consumes more CPU than before in equal conditions. Moreover, Rueidis-based implementation results into more CPU usage on Redis instance than we had with previous implementation. I did not expect that at all. To investigate that I made a repo: https://github.com/FZambia/pipelines.

In that repo I implemented 3 benchmarks: for pipelined Redigo, pipelined Go-Redis and Rueidis.

After running benchmarks I observed the following:

input_1.mp4
❯ go test -run xxx -bench . -benchtime 10s
goos: darwin
goarch: arm64
pkg: github.com/FZambia/pipelines
BenchmarkRedigo-8    	12460600	       959.6 ns/op	     181 B/op	       4 allocs/op
BenchmarkGoredis-8   	 8069197	      1534 ns/op	     343 B/op	       5 allocs/op
BenchmarkRueidis-8   	19451470	       620.0 ns/op	      80 B/op	       1 allocs/op

Here we can see that CPU usage is:

Redigo Goredis Rueidis
Application CPU, % 285 270 470
Redis CPU, % 56 34 80

Nothing too special here – all numbers are +/- expected. Rueidis produced better throughput so it loaded Redis more and the price for the better throughput is application CPU utilization.

But in Centrifugo case I compared CPU usage with Redigo and Rueidis in equal conditions. So I added rate limiter to benchmarks in the https://github.com/FZambia/pipelines repo to generate the same load in all cases. Limiting load to 100 commands per millisecond (100k per second).

input_2.mp4
❯ PIPE_LIMITED=1 go test -run xxx -bench . -benchtime 10s
goos: darwin
goarch: arm64
pkg: github.com/FZambia/pipelines
BenchmarkRedigo-8    	 1000000	     10000 ns/op	     198 B/op	       5 allocs/op
BenchmarkGoredis-8   	 1000000	     10000 ns/op	     350 B/op	       8 allocs/op
BenchmarkRueidis-8   	 1000000	     10000 ns/op	     113 B/op	       2 allocs/op
PASS
ok  	github.com/FZambia/pipelines	30.629s
Redigo Goredis Rueidis
Application CPU, % 91 96 118
Redis CPU, % 36 34 45

This is more interesting. We are generating the same load in all benchmarks but both app and Redis CPU is the worst in Rueidis case.

Turned out the difference here is the result of different batch sizes we are sending to Redis. In Redigo/Goredis case we have larger batches than in Rueidis case. In Rueidis case we have smaller size batches and thus more syscalls in app and on Redis side. As we can see CPU is very sensitive to this.

There is a project called Twemproxy which acts as a proxy between applications and Redis and makes automatic batches thus reducing load on Redis, so in general pipelining is known not only to increase throughput but to reduce CPU usage of Redis. As Redis is single threaded its capacity is quite limited actually.

I tried to find a simple way to improve batching of Rueidis somehow. The simplest solution I found at this point is this one: main...FZambia:rueidis:GetWriterEachConn

I.e. introducing an option to provide custom bufio.Writer. I used it like this:

func rueidisClient() rueidis.Client {
	options := rueidis.ClientOption{
		InitAddress:  []string{":6379"},
		DisableCache: true,
	}
	if os.Getenv("PIPE_DELAYED") != "" {
		options.GetWriterEachConn = func(writer io.Writer) (*bufio.Writer, func()) {
			mlw := newDelayWriter(bufio.NewWriterSize(writer, 1<<19), time.Millisecond)
			w := bufio.NewWriterSize(mlw, 1<<19)
			return w, func() { mlw.close() }
		}
	}
	client, err := rueidis.NewClient(options)
	if err != nil {
		log.Fatal(err)
	}
	return client
}


type writeFlusher interface {
	io.Writer
	Flush() error
}

type delayWriter struct {
	dst   writeFlusher
	delay time.Duration // zero means to flush immediately

	mu           sync.Mutex // protects tm, flushPending, and dst.Flush
	tm           *time.Timer
	err          error
	flushPending bool
}

func newDelayWriter(dst writeFlusher, delay time.Duration) *delayWriter {
	return &delayWriter{dst: dst, delay: delay}
}

func (m *delayWriter) Write(p []byte) (n int, err error) {
	m.mu.Lock()
	defer m.mu.Unlock()
	if m.err != nil {
		return 0, err
	}
	n, err = m.dst.Write(p)
	if m.delay <= 0 {
		err = m.dst.Flush()
		return
	}
	if m.flushPending {
		return
	}
	if m.tm == nil {
		m.tm = time.AfterFunc(m.delay, m.delayedFlush)
	} else {
		m.tm.Reset(m.delay)
	}
	m.flushPending = true
	return
}

func (m *delayWriter) delayedFlush() {
	m.mu.Lock()
	defer m.mu.Unlock()
	if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
		return
	}
	err := m.dst.Flush()
	if err != nil {
		m.err = err
	}
	m.flushPending = false
}

func (m *delayWriter) close() {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.flushPending = false
	if m.tm != nil {
		m.tm.Stop()
	}
}

The code of delayed writer inspired by Caddy's code. It basically delays writes into connection.

We sacrifice latency for less syscalls.

input_3.mp4
❯ PIPE_LIMITED=1 PIPE_DELAYED=1 go test -run xxx -bench . -benchtime 10s
goos: darwin
goarch: arm64
pkg: github.com/FZambia/pipelines
BenchmarkRedigo-8    	 1000000	     10000 ns/op	     198 B/op	       5 allocs/op
BenchmarkGoredis-8   	 1000000	     10000 ns/op	     350 B/op	       8 allocs/op
BenchmarkRueidis-8   	 1000000	     10002 ns/op	     114 B/op	       2 allocs/op
PASS
ok  	github.com/FZambia/pipelines	30.712s
Redigo Goredis Rueidis
Application CPU, % 91 96 51
Redis CPU, % 36 34 6

From these results we can see that by better batching we can reduce both application and Redis CPU usage, as we make less read/write syscalls. For Rueidis CPU of benchmark process reduced from 118 to 51 %, for Redis process from 45 to 6 %. Extra millisecond latency seems tolerable for such a huge resource reduction.


Unfortunately, it may be that I missed sth – so would be interesting to listen to your opinion, whether you see potential issues with this approach. Actually under different level of parallelism results may be different – since batch sizes change. All libraries in the test may perform better or worse.

I think resource reduction like this is great to have. In Centrifugo case users tend to add more Centrifugo nodes that work with single Redis instance - so possibility to keep Redis CPU as low as possible seems nice. Probably you may suggest a better approach to achieve this.

@rueian
Copy link
Collaborator

rueian commented Dec 8, 2022

Hi @FZambia,

Thank you very much for your benchmark. It is very impressive.

Your benchmark clearly shows that the rueidis' pipe._backgroundWrite() is more aggressive comparing to the select or break pipelining technique you used with redigo and goredis. In other words, it loops faster than receiving commands from go channel and misses too many chances to batch commands.

Therefore, if we make it slow down a little bit, for example:

diff --git a/pipe.go b/pipe.go
index 8807779..e800a0d 100644
--- a/pipe.go
+++ b/pipe.go
@@ -292,6 +292,7 @@ func (p *pipe) _backgroundWrite() (err error) {
                                err = p.Error()
                        } else {
                                err = p.w.Flush()
+                               time.Sleep(time.Microsecond * 100)
                        }
                        if err == nil {
                                if atomic.LoadInt32(&p.state) == 1 {

Then we can have a similar result:

slowbatch.mov

This approach is probably simpler than customizing bufio writer.

Actually under different level of parallelism results may be different – since batch sizes change. All libraries in the test may perform better or worse.

Indeed. Even the real network environment should also be taken into consideration.

Although it is really hard or almost impossible for users to tweak this kind of delay, I think we can still have an option in ClientOption to slow pipe._backgroundWrite() down.

@FZambia
Copy link
Contributor Author

FZambia commented Dec 8, 2022

Actually I also did Sleep initially while looking for a reason but then decided it's too aggressive change to the library. Also, I was unsure how blocking on this stage may affect rueidis. In additional buffer case I showed we do not block writer loop. But you think blocking it with sleep wont introduce any downsides outside the increased latency right?

@rueian
Copy link
Collaborator

rueian commented Dec 8, 2022

Yes, I think so. Maybe the sleep should be added under the if err == nil block.

@FZambia
Copy link
Contributor Author

FZambia commented Dec 8, 2022

Thinking more... I still worrying it's not a strict equivalent to intermediate buffer. Because in the intermediate buffer approach we never block writing, timer is asynchronous there. So time interval to use in the intermediate buffer does not play the same role as it does if we add Sleep to backgroundWrite loop directly. 🤔 Possibly we can somehow get best from two worlds to avoid blocking.

@FZambia
Copy link
Contributor Author

FZambia commented Dec 8, 2022

Probably sth like this? (optional FlushInterval for ClientOption):

diff --git a/pipe.go b/pipe.go
index 2eb242d..c7a50bd 100644
--- a/pipe.go
+++ b/pipe.go
@@ -46,6 +46,7 @@ type pipe struct {
 	cache           cache
 	r               *bufio.Reader
 	w               *bufio.Writer
+	flushInterval   time.Duration
 	close           chan struct{}
 	onInvalidations func([]RedisMessage)
 	r2psFn          func() (p *pipe, err error)
@@ -82,6 +83,8 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool)
 		r:     bufio.NewReaderSize(conn, option.ReadBufferEachConn),
 		w:     bufio.NewWriterSize(conn, option.WriteBufferEachConn),

+		flushInterval: time.Millisecond,
+
 		nsubs: newSubs(),
 		psubs: newSubs(),
 		ssubs: newSubs(),
@@ -286,13 +289,37 @@ func (p *pipe) _backgroundWrite() (err error) {
 		ch    chan RedisResult
 	)

+	var mu sync.Mutex
+
+	if p.flushInterval > 0 {
+		go func() {
+			for {
+				select {
+				case <-time.After(p.flushInterval):
+					mu.Lock()
+					err = p.w.Flush()
+					mu.Unlock()
+					if err != nil {
+						// TODO.
+					}
+				case <-p.close:
+					return
+				}
+			}
+		}()
+	}
+
 	for atomic.LoadInt32(&p.state) < 3 {
 		if ones[0], multi, ch = p.queue.NextWriteCmd(); ch == nil {
+			mu.Lock()
 			if p.w.Buffered() == 0 {
 				err = p.Error()
 			} else {
-				err = p.w.Flush()
+				if p.flushInterval == 0 {
+					err = p.w.Flush()
+				}
 			}
+			mu.Unlock()
 			if err == nil {
 				if atomic.LoadInt32(&p.state) == 1 {
 					ones[0], multi, ch = p.queue.WaitForWrite()
@@ -306,7 +333,9 @@ func (p *pipe) _backgroundWrite() (err error) {
 			multi = ones
 		}
 		for _, cmd := range multi {
+			mu.Lock()
 			err = writeCmd(p.w, cmd.Commands())
+			mu.Unlock()
 		}
 		if err != nil {
 			if err != ErrClosing { // ignore ErrClosing to allow final QUIT command to be sent

(Locks may be avoided if FlushInterval not used)

@rueian
Copy link
Collaborator

rueian commented Dec 9, 2022

Oh… Additional goroutine and lock looks too heavy. Furthermore, the lock should also be taken on the synchronous path.

Thinking more... I still worrying it's not a strict equivalent to intermediate buffer. Because in the intermediate buffer approach we never block writing, timer is asynchronous there.

The only difference I can think of is missing some chances to trigger implicit flush of bufio writer if there are some commands exceeding the size of its buffer during we are slept. Given that the sleeping interval should be small, I think this difference is not a big deal.

@FZambia
Copy link
Contributor Author

FZambia commented Dec 9, 2022

I tested both approaches - with extra flush goroutine and with:

if p.flushInterval > 0 {
	select {
	case <-time.After(time.Millisecond):
	case <-p.close:
		return
	}
}

– after Flush (i.e. basically Sleep).

For 128 parallelism (old is extra flush goroutine, new is pause in background write loop):

name       old time/op    new time/op    delta
Rueidis-8    1.09µs ± 1%    1.19µs ± 1%  +9.58%  (p=0.000 n=20+20)

name       old alloc/op   new alloc/op   delta
Rueidis-8     80.0B ± 0%     80.0B ± 0%    ~     (all equal)

name       old allocs/op  new allocs/op  delta
Rueidis-8      1.00 ± 0%      1.00 ± 0%    ~     (all equal)

So approach with pause gives a slightly worse latency than flush in goroutine.

For 1024 paralellism:

name       old time/op    new time/op    delta
Rueidis-8    1.08µs ± 1%    1.21µs ± 1%  +12.01%  (p=0.000 n=17+20)

name       old alloc/op   new alloc/op   delta
Rueidis-8     82.0B ± 0%     82.0B ± 0%     ~     (all equal)

name       old allocs/op  new allocs/op  delta
Rueidis-8      1.00 ± 0%      1.00 ± 0%     ~     (all equal)

Similar picture.


Though for app and Redis CPU usage the picture is like this:

For 128 parallelism:

Flush goroutine: App cpu 290%, Redis CPU 42%
Sleep/Pause: App cpu 177%, Redis CPU 38%

For 1024 parallelism:

App cpu 388%, Redis CPU 40%
Sleep/Pause: App cpu 280%, Redis CPU 38%

App CPU reduction seems significant in sleep/pause case. So I think sleep/pause approach is OK.

It's worth mentioning that when concurrency is small, then both approaches result into significantly less throughput. Sth like 8k requests per second instead of 245k rps on parallelism == 1, 77k rps instead of 1mln rps on parallelism == 10. For my use case it's totally OK as requests come from concurrent parts of the application, I can't figure out at the moment whether it's possible to introduce more adaptive strategy here. I.e. some factor which will increase batch sizes as soon as concurrency and number of requests grows (but again, for my use case it's not important).

@FZambia
Copy link
Contributor Author

FZambia commented Dec 9, 2022

Also tested for sequential case: it's 1k rps vs 45k rps.

for my use case it's not important

Actually it seems that for some scenarios this may be important.. Whether it's possible to avoid sleeping at all and still have good batches in all cases 🤔 Probably try to combine current rueidis approach with a smart batching technique I had before.

@rueian
Copy link
Collaborator

rueian commented Dec 9, 2022

Hi @FZambia,

So approach with pause gives a slightly worse latency than flush in goroutine.

I also got similar results of +10% latency. But I quickly realized pausing pipe._backgroundWrite() for 1 millisecond was too long in the sense of just slowing it down a little bit.

I think pausing it for 20 microseconds is enough for local redis server. Here are results:

For 128 parallelism (old is extra flush goroutine, new is pause in background write loop):

▶ benchstat old.txt new.txt
name        old time/op    new time/op    delta
Rueidis-10     608ns ± 2%     596ns ± 2%  -1.89%  (p=0.000 n=20+19)

name        old alloc/op   new alloc/op   delta
Rueidis-10     82.0B ± 0%     80.0B ± 0%  -2.44%  (p=0.000 n=20+20)

name        old allocs/op  new allocs/op  delta
Rueidis-10      1.00 ± 0%      1.00 ± 0%    ~     (all equal)

For 1024 parallelism:

▶ benchstat old.txt new.txt
name        old time/op    new time/op    delta
Rueidis-10     679ns ± 7%     663ns ± 7%  -2.35%  (p=0.013 n=20+20)

name        old alloc/op   new alloc/op   delta
Rueidis-10     83.4B ± 1%     81.0B ± 0%  -2.82%  (p=0.000 n=20+20)

name        old allocs/op  new allocs/op  delta
Rueidis-10      1.00 ± 0%      1.00 ± 0%    ~     (all equal)

And still have comparable throughput when parallelism = 1:

▶ go test -run xxx -bench . -benchtime 10s
goos: darwin
goarch: arm64
pkg: github.com/FZambia/pipelines
BenchmarkRedigo-10     	 2640541	      4532 ns/op	     173 B/op	       4 allocs/op
BenchmarkGoredis-10    	 2459900	      4871 ns/op	     328 B/op	       6 allocs/op
BenchmarkRueidis-10    	 2707039	      4435 ns/op	      80 B/op	       1 allocs/op
PASS
ok  	github.com/FZambia/pipelines	50.350s

However, for sequential usage, 20 microseconds is still too long. Thankfully it is possible to detect this case:

diff --git a/pipe.go b/pipe.go
index 8807779..28a4bb7 100644
--- a/pipe.go
+++ b/pipe.go
@@ -291,7 +291,13 @@ func (p *pipe) _backgroundWrite() (err error) {
                        if p.w.Buffered() == 0 {
                                err = p.Error()
                        } else {
-                               err = p.w.Flush()
+                               if atomic.LoadInt32(&p.waits) == 1 {
+                                       err = p.w.Flush()
+                               } else {
+                                       ts := time.Now()
+                                       err = p.w.Flush()
+                                       time.Sleep(time.Microsecond*20 - time.Since(ts))
+                               }
                        }
                        if err == nil {
                                if atomic.LoadInt32(&p.state) == 1 {

Also note that in pausing case we probably should record the elapsed time of p.w.Flush() to take network condition into account.


I can't figure out at the moment whether it's possible to introduce more adaptive strategy here.

I think it is possible and probably can support multiple strategies by swapping pipes when a standalone monitoring goroutine find a better pipe configuration.

@FZambia
Copy link
Contributor Author

FZambia commented Dec 10, 2022

I think pausing it for 20 microseconds is enough for local redis server. Here are results:

20 microseconds seem to work fine in all scenarios, right! (And actually produces better throughput for me for non-limited scenario). But I suppose it should be optional anyway? Or default since improves throughput a bit? And I think it would be nice to tune it for non-local setup where RTT is larger so we can afford larger time to collect batches? Should we also listen to pipe close channel while sleeping to return quickly?

More advanced strategies are nice to have - though much more complex and I can't even imagine one now, and probably should be tested with all the parallelism, request rates, network latencies taken into account.

@rueian
Copy link
Collaborator

rueian commented Dec 10, 2022

But I suppose it should be optional anyway? Or default since improves throughput a bit?

Even though it produces better throughput for non-limited scenario, I think it should be optional because latency is still critical to many users.

Should we also listen to pipe close channel while sleeping to return quickly?

Actually, we shouldn't and it won't have effect. p.close is closed only after pipe._backgroundWrite().

And I think it would be nice to tune it for non-local setup where RTT is larger so we can afford larger time to collect batches?

Yes, I propose that adding an optional MaxFlushDelay time.Duration to ClientOption. Would you like to change #157 to this proposal?

More advanced strategies are nice to have - though much more complex and I can't even imagine one now, and probably should be tested with all the parallelism, request rates, network latencies taken into account.

Sure. Thankfully, it is pipe._backgroundWrite() runs too fast and gives us rooms for future improvement. I think we can probably choose a better sleeping duration automatically based on some statistics, such as per byte flush latency.

rueian added a commit that referenced this issue Dec 10, 2022
rueian added a commit that referenced this issue Dec 10, 2022
@rueian
Copy link
Collaborator

rueian commented Dec 10, 2022

Hi @FZambia, many thanks! The MaxFlushDelay option has already released in v0.0.89. I think we can leave this issue open for future improvement.

@lgothard
Copy link

lgothard commented Dec 20, 2022

@rueian Noticed the following being sent to Redis Enterprise when DisableCache: true with this change

"HELLO" "3" "AUTH"
"ERR Syntax error in HELLO option 'AUTH'"

@rueian
Copy link
Collaborator

rueian commented Dec 20, 2022

Hi @lgothard,

Would you mind providing the code snippet and how you set up Redis Enterprise?

I have tried to reproduce the error but failed. In my case, Redis Enterprise 6.2.6 returns unknown command 'HELLO' instead of ERR Syntax error in HELLO option 'AUTH'

@lgothard
Copy link

Hey @rueian,

This was my bad. I had my windows confused. I got this error on Redis Stack v6.2.7. On our Redis Enterprise v6.0.16 test server, I don’t see it. Sorry for the confusion.

@rueian
Copy link
Collaborator

rueian commented Dec 29, 2022

This was my bad. I had my windows confused. I got this error on Redis Stack v6.2.7. On our Redis Enterprise v6.0.16 test server, I don’t see it. Sorry for the confusion.

Hi @lgothard,

The latest version of Redis Stack seems to be v6.2.6-v0. I have tested the following snippet with both v6.2.6-v0 and 7.0.6-RC2 and it worked fine:

package main

import (
	"github.com/rueian/rueidis"
)

func main() {
	c, err := rueidis.NewClient(rueidis.ClientOption{
		InitAddress:  []string{"127.0.0.1:6379"},
		DisableCache: true,
	})
	if err != nil {
		panic(err)
	}
	defer c.Close()
}

Would you mind providing your code snippet as well?

"HELLO" "3" "AUTH"
"ERR Syntax error in HELLO option 'AUTH'"

The error message seemed to indicate that there was no username and password in the AUTH part. But I currently couldn't find a way to reproduce the situation and provide a proper fix.

@rueian rueian added the help wanted Extra attention is needed label May 14, 2023
@rueian rueian added the enhancement New feature or request label May 24, 2023
@rueian rueian removed the help wanted Extra attention is needed label Oct 3, 2023
@liuguili2020
Copy link

liuguili2020 commented Jun 12, 2024

Hey @rueian,
I'm not sure if I can ask you, but we found a flaw in the kernel we're using. Due to interrupt reasons, sometimes the CPU can't schedule to Rueidis, and then we found the CPU usage of Rueidis rapidly increasing.We think it might be related to the content mentioned in this issue.

We captured the following information using the perf tool:
image

@rueian
Copy link
Collaborator

rueian commented Jun 12, 2024

and then we found the CPU usage of Rueidis rapidly increasing

Hi @liuguili2020, did the CPU usage ever go down?

@liuguili2020
Copy link

liuguili2020 commented Jun 13, 2024

Hey @rueian,
Every time the CPU stays at 100% for 15 minutes, it returns to normal. It seems to be related to the 15-minute TCP timeout.

Here are the client options we have set:
clientOption = rueidis.ClientOption{
InitAddress: []string{0: initAddress},
ClientTrackingOptions: []string{"PREFIX", Prefix, "BCAST"},
OnInvalidations: onInvalidations,
}

Do we need to set other parameters, such as MaxFlushDelay, ConnWriteTimeout, and Dialer? Or could it be caused by the DB tracking feature? Do you have any recommended values?

From the test results, it appears that this function has entered an busy loop: func (p *pipe) _backgroundWrite() (err error).
we use github.com/redis/rueidis v1.0.31.

we found there is a busy loop here:
image

@rueian
Copy link
Collaborator

rueian commented Jun 13, 2024

Hi @liuguili2020, I guess the 15-minute busy loop is related to the current graceful shutdown of rueidis.

I just released a patch to https://github.com/redis/rueidis/releases/tag/v1.0.31-cap which limits the duration of the graceful shutdown period. Would you like to give it a try?

If the patch could shorten your busy loop, then we would have a better idea of how to fix the issue.

@qihuaheng
Copy link

qihuaheng commented Jun 14, 2024

Hi @liuguili2020, I guess the 15-minute busy loop is related to the current graceful shutdown of rueidis.

I just released a patch to https://github.com/redis/rueidis/releases/tag/v1.0.31-cap which limits the duration of the graceful shutdown period. Would you like to give it a try?

If the patch could shorten your busy loop, then we would have a better idea of how to fix the issue.

Hi @rueian, thanks a lot for your response. I'm from the team of @liuguili2020, and want to make it clearer why always enter the logic of 'ErrClosing' Line 445 and Line 448. and What is the purpose of 'ErrClosing', I think the client is blocked on sending data or pending response on a dead connection. I haven't figured out the underlying connection management yet, so I'd like to know why these don't work if it's stuck in the graceful shutdown phase for fixed 15 minutes.

rueidis.ClientOption.ConnWriteTimeout 10s
rueidis.ClientOption.Dialer.Timeout 5s
rueidis.ClientOption.Dialer.KeepAlive 1s
rueidis.ClientOption.MaxFlushDelay 0s

Snipaste_2024-06-14_13-44-28

And about 15-minute busy loop, I found this https://supportportal.juniper.net/s/article/Juniper-PAA-TCP-retransmission-time-interval-in-Monitor?language=en_US

TCP retransmits an unacknowledged packet up to tcp_retries2 sysctl setting times (defaults to 15) using an exponential backoff timeout for which each retransmission timeout is between TCP_RTO_MIN (200 ms) and TCP_RTO_MAX (120 seconds). Once the 15th retry expires (by default), the TCP stack will notify the layers above (ie. app) of a broken connection.

The value of TCP_RTO_MIN and TCP_RTO_MAX is hardcoded in the Linux kernel and defined by the following constants:

#define TCP_RTO_MAX ((unsigned)(120*HZ))
#define TCP_RTO_MIN ((unsigned)(HZ/5))

Linux 2.6+ uses HZ of 1000ms, so TCP_RTO_MIN is ~200 ms and TCP_RTO_MAX is ~120 seconds. Given a default value of tcp_retries set to 15, it means that it takes 924.6 seconds before a broken network link is notified to the upper layer (ie. application), since the connection is detected as broken when the last (15th) retry expires.

renditionDownload

@qihuaheng
Copy link

Hi, again. I found this #108 Is it a similar issue?
#108 (comment)

rueidis/pipe.go

Line 227 in defba73

<-p.queue.PutOne(cmds.QuitCmd)

There would be a QUIT injected after _backgroundRead.

It is replaced with PING,
https://github.com/redis/rueidis/blob/v1.0.31/pipe.go#L346

I think it's the final PING command mentioned in pipe.go
https://github.com/redis/rueidis/blob/v1.0.31/pipe.go#L445

If I understand correctly, since the error code ErrClosing has been received, indicating that the link has been closed, do we only need to retry a few times instead of constantly retrying? What is the purpose of the final PING that must be sent?

@liuguili2020
Copy link

liuguili2020 commented Jun 14, 2024

Hi @liuguili2020, I guess the 15-minute busy loop is related to the current graceful shutdown of rueidis.

I just released a patch to https://github.com/redis/rueidis/releases/tag/v1.0.31-cap which limits the duration of the graceful shutdown period. Would you like to give it a try?

If the patch could shorten your busy loop, then we would have a better idea of how to fix the issue.

Hi @rueian , Thank you for your reply and support.

we got test result of above patch, The CPU usage only increased to 12% and then dropped after about 15 seconds (05:58:15-05:58:30):
image

For comparison, before your modifications, our test results were as follows, it increased to 100% and remained there for 15 minutes before dropping:
image

(The horizontal axis represents time, and the vertical axis represents CPU usage percentage.)

Could you release an official version to fix this issue? Thank you very much.

@rueian
Copy link
Collaborator

rueian commented Jun 14, 2024

Hi @qihuaheng,

What is the purpose of the final PING that must be sent?

By receiving the final PONG response, we can make sure that all preceding commands are fulfilled. That is how we do a graceful shutdown.

do we only need to retry a few times instead of constantly retrying?

Yes, I believe so. The v1.0.31-cap is a quick workaround to that and it indeed stops the busy loop according to your result.

Hi @liuguili2020,

Could you release an official version to fix this issue? Thank you very much.

There will be an official fix to remove the busy loop completely, and it will take me some time. I think the fix will land in v1.0.40 if everything goes well.

@rueian
Copy link
Collaborator

rueian commented Jun 14, 2024

@liuguili2020, it seems that you can stably reproduce the busy loop. Could you also share how you reproduce it?

@liuguili2020
Copy link

@liuguili2020, it seems that you can stably reproduce the busy loop. Could you also share how you reproduce it?

Hi @rueian ,
We just establish a connection with Redis and ping it every second to check if the connection is normal. We upgraded an internal environment, and in the new environment version, the issue is stably reproducible, whereas it doesn't occur in the old environment version. What we currently know is that the new environment has issues related to interrupted calls. We suspect that the connection with Redis was already lost at that time, but we don't know why the client couldn't detect it.

Our runtime environment is a K8s container (IPVS), and then the Redis cluster was also restarted, and our program had just started.

@liuguili2020
Copy link

liuguili2020 commented Jun 19, 2024

Hi @rueian ,
Sorry to bother you again. I have found another issue.
We have Redis master and replica servers, and when a failover occurs, the master server goes down and the replica is promoted to master. We use ping to check if the connection is normal, pinging once every second. However, we have found that during the ten-plus seconds of a Redis server failover, the ping often succeeds. At the same time, our client(rueidis) keeps trying to reconnect to the server very continuously(It occurs approximately every few milliseconds). Is there a timer to stop the connection attempts after a certain period of unsuccessful attempts? What are our default automatic reconnection intervals and maximum number of attempts? Can they be configured?

Thank you very much.
BRs//Guili

@rueian
Copy link
Collaborator

rueian commented Jun 19, 2024

Hi @liuguili2020,

rueidis will only retry on read-only commands. PING will not be retried.

However, we have found that during the ten-plus seconds of a Redis server failover, the ping often succeeds.

What happened after 10+ seconds?

Is there a timer to stop the connection attempts after a certain period of unsuccessful attempts?

The only supported timer for retrying read-only commands is the ctx passed to client.Do(), otherwise you need to disable the behavior completely by setting DisableRetry=true. Why would you want to disable it?

@liuguili2020
Copy link

Hi @rueian,
We have designed a mechanism in our code to ping once per second to check if the connection with the Redis server is lost. If it is, we will re-establish the client connection. Is this necessary?
Does rueidis inherently detect a disconnection from Redis and attempt to reconnect? If so, could you point me to the specific location of this code?

Thank you very much.
BRs//Guili

@rueian
Copy link
Collaborator

rueian commented Jun 19, 2024

We have designed a mechanism in our code to ping once per second to check if the connection with the Redis server is lost. If it is, we will re-establish the client connection. Is this necessary?

It is necessary if you want to failover to another address without the help of Redis Cluster or Redis Sentinel.

Does rueidis inherently detect a disconnection from Redis and attempt to reconnect? If so, could you point me to the specific location of this code?
here:

rueidis/mux.go

Lines 167 to 170 in 87327f9

if w = m.wire[i].Load().(wire); w == m.init {
if w = m.wireFn(); w != m.dead {
m.setCloseHookOnWire(i, w)
m.wire[i].Store(w)

rueidis will mark broken connections as m.init. When the next time it picks a m.init connection, it will use the wireFn function to make a real connection.

@qihuaheng
Copy link

qihuaheng commented Jun 20, 2024

It is necessary if you want to failover to another address without the help of Redis Cluster or Redis Sentinel.

Hi @rueian, thanks for your reply. I want to know what does it "the help of Redis Cluster" specifically refer to? Do you mean MOVED and ASK redirection or CLUSTER SLOTS?

I think these have been supported by rueidis, it internally discover all other master nodes via MOVED/ASK/CLUSTER SLOTS and automatically connect to them, right?

Is there a need for the application layer start a long-run liveness PING probe for Redis cluster failover?
If PONG returned, does it mean all master nodes have been pinged and all the connections are active?
If error returned, does it mean only the connection between the InitAddress (used as seed) redis node and rueidis is broken?

We are rethinking the design of the application layer and tend to remove the liveness ping mechanism at the application layer and rely entirely on rueidis's underlying connection management. We currently only count the errors returned by rueidis client.Do() and consider them to be temporary errors, which in turn lead to temporary failures of our service callers. This solution is built on the assumption that rueidis can perfectly handle the restart and removal of any Redis pod in the Redis Cluster on Kubernetes in the failover scenario.

Live reconfiguration chapter in Redis cluster spec

Redis Cluster supports the ability to add and remove nodes while the cluster is running.

Client connections and redirection handling

To be efficient, Redis Cluster clients maintain a map of the current slot configuration. However, this configuration is not required to be up to date. When contacting the wrong node results in a redirection, the client can update its internal slot map accordingly.

Clients usually need to fetch a complete list of slots and mapped node addresses in two different situations:

At startup, to populate the initial slots configuration
When the client receives a MOVED redirection

Note that a client may handle the MOVED redirection by updating just the moved slot in its table; however this is usually not efficient because often the configuration of multiple slots will be modified at once. For example, if a replica is promoted to master, all of the slots served by the old master will be remapped). It is much simpler to react to a MOVED redirection by fetching the full map of slots to nodes from scratch.

Client can issue a CLUSTER SLOTS command to retrieve an array of slot ranges and the associated master and replica nodes serving the specified ranges.

CLUSTER SLOTS is not guaranteed to return ranges that cover the full 16384 slots if the cluster is misconfigured, so clients should initialize the slots configuration map filling the target nodes with NULL objects, and report an error if the user tries to execute commands about keys that belong to unassigned slots.

Before returning an error to the caller when a slot is found to be unassigned, the client should try to fetch the slots configuration again to check if the cluster is now configured properly.

And the Challenge of Redis Setup on Kubernetes

  • We have to use the headless service of Redis because it’s a TCP based service and normal service is HTTP(Layer 7) based Loadbalancer. So in case of headless service, no ClusterIP will be used and we have to rely on Pod IP.

  • Redis doesn’t use DNS to form clusters instead of that it uses IP. So we cannot use the internal DNS name of headless service, instead of that, we have to use Pod IP to form Redis cluster.

  • In Kubernetes, Pod IP is dynamic and it can change after the pod restart, so in case of the restart the cluster will be malformed and the restarted pod will act as a lost node.

@rueian
Copy link
Collaborator

rueian commented Jun 20, 2024

I think these have been supported by rueidis, it internally discover all other master nodes via MOVED/ASK/CLUSTER SLOTS and automatically connect to them, right?

Yes, it does.

Is there a need for the application layer start a long-run liveness PING probe for Redis cluster failover?

No, no need to do that. rueidis already does it for you.

If PONG returned, does it mean all master nodes have been pinged and all the connections are active?

No, it only means a random Redis instance is alive because rueidis picks a random Redis for the PING.

If you want to PING every instance of a Redis Cluster, you should do this:

for _, c := range client.Nodes() {
	_, err := c.Do(context.Background(), c.B().Ping().Build()).Error()
}

We currently only count the errors returned by rueidis client.Do() and consider them to be temporary errors, which in turn lead to temporary failures of our service callers. This solution is built on the assumption that rueidis can perfectly handle the restart and removal of any Redis pod in the Redis Cluster on Kubernetes in the failover scenario.

rueidis will do its best to follow Redis Cluster redirection and reconnection, but temporary failures may still happen due to various delays, including delays of the voting inside the Redis Cluster. If you find a better way to handle failover, please let us know.

@rueian
Copy link
Collaborator

rueian commented Jun 20, 2024

Could you release an official version to fix this issue? Thank you very much.

Hi @liuguili2020,

We have removed the busy loop and refined the capped graceful shutdown at v1.0.40-alpha. Could you give it a try?

@qihuaheng
Copy link

I think these have been supported by rueidis, it internally discover all other master nodes via MOVED/ASK/CLUSTER SLOTS and automatically connect to them, right?

Yes, it does.

Is there a need for the application layer start a long-run liveness PING probe for Redis cluster failover?

No, no need to do that. rueidis already does it for you.

If PONG returned, does it mean all master nodes have been pinged and all the connections are active?

No, it only means a random Redis instance is alive because rueidis picks a random Redis for the PING.

If you want to PING every instance of a Redis Cluster, you should do this:

for _, c := range client.Nodes() {
	_, err := c.Do(context.Background(), c.B().Ping().Build()).Error()
}

We currently only count the errors returned by rueidis client.Do() and consider them to be temporary errors, which in turn lead to temporary failures of our service callers. This solution is built on the assumption that rueidis can perfectly handle the restart and removal of any Redis pod in the Redis Cluster on Kubernetes in the failover scenario.

rueidis will do its best to follow Redis Cluster redirection and reconnection, but temporary failures may still happen due to various delays, including delays of the voting inside the Redis Cluster. If you find a better way to handle failover, please let us know.

Hi @rueian
Thank you very much for your explanation, which makes me more confident in my technical solution. I will let you know the problems and proposals found during the testing process.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants