-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PubSub: another Receive() with hooks #621
Comments
Hi @DriverX, Thank you for raising the issue. I do think the current Does the |
I wrote about memory leak based on this point
And again for a better understanding of my case =) I have 2 apps
And its apps works with 2 redis clusters:
Now some pseudocode executor
runner
This code is race condition safe:
But imagine if
|
Sounds good! |
Hi @DriverX, Thank you for your detailed explanation. If you are using Redis > 6, I think there is a simpler solution by leveraging invalidation notifications of client-side-caching and no pubsub cluster needed: package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/redis/rueidis"
)
func main() {
var mu sync.Mutex
channels := make(map[string]chan struct{})
client, _ := rueidis.NewClient(rueidis.ClientOption{
OnInvalidations: func(messages []rueidis.RedisMessage) {
mu.Lock()
defer mu.Unlock()
for _, message := range messages {
key, _ := message.ToString()
if ch, ok := channels[key]; ok {
delete(channels, key)
close(ch)
}
}
},
})
key := "task_result:ooxx"
RETRY:
ch := make(chan struct{})
mu.Lock()
channels[key] = ch
mu.Unlock()
result, err := client.DoCache(context.Background(), client.B().Get().Key(key).Cache(), time.Second).ToString()
if rueidis.IsRedisNil(err) {
<-ch
result, err = client.DoCache(context.Background(), client.B().Get().Key(key).Cache(), time.Second).ToString()
} else {
mu.Lock()
delete(channels, key)
mu.Unlock()
}
if rueidis.IsRedisNil(err) {
goto RETRY
}
if err != nil {
panic(err)
}
fmt.Println(result)
} |
We use Redis 7.0 in production. And this is very interesting suggestion. I think about it. But now i would like to use pubsub scheme because big piece of infrastructure based on this pattern and unfortunately not all client support client side caching. |
I got a similar problem too when I try to use redis pubsub to implement the nats request feature in nats.io e.g. in service B In service A In service A, now I need to ensure the step 2 is after step 1. But client.Receive is blocking so I cannot make sure that. Can we pass a function to client.Receive so that it will run the function after it subscribed to the channel? |
Hi @TommyLeng, We don’t have and ETA on this feature, but I do hope we and a Talking about the case of disconnection, it is also the reason why generally using Redis PubSub to implement asynchronous RPC is not recommended. Messages can be lost in this case. A recommended way is using the BLPOP and LPUSH instead. |
Hi @DriverX Are you still tackling this issue? If not, I’d be happy to jump in |
We use short-lived pubsub channels. And typical case is:
Sometimes this cycle is really short in time.
Problem
client.Receive()
If using first pattern from doc viaclient.Receive()
i can't guarantee that subsequentUNSUBSCRIBE
call will be done strongly afterSUBSCRIBE
. Possible situation:client.Receive()
withSUBSCRIBE
in another goroutineclient.Do()
withUNSUBSCRIBE
commandSUBSCRIBE
wake up to late and register subscribe handler in internal structures forever and handler neverunsubscribe
messageIf using alternative pattern with dedicated client i have another problem - too many open connections: connection per channel. We have hundreds or even thousands channels concurrently and no one connection pool can't handle this or ensure predictable behavior.
Reproduce problem
This code stuck on
Solutions
I think if
client.Receive()
will have option wait subscribe success and post it knowledge somewhere here pipe.go#L691Or another option. New version of
client.Receive()
with additional callback function withPubSubSubscription
when received(s|p)subscribe
response from Redis. For example:I will try to make PR with a new additional method.
The text was updated successfully, but these errors were encountered: