-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing read race condition during pubsub #1737
Conversation
Being able to start polling without making a subscription would be nice - I ran into this limitation recently (although in aioredis). An immediate problem I can see with this is that I feel like this probably still has a race somewhere, because by the time |
Looking at it again, I think the race I was worried about can't happen - provided that there is just one thread calling (un)subscription functions ("main thread") and one thread using
When I think there might still be a bug where |
Codecov Report
@@ Coverage Diff @@
## master #1737 +/- ##
==========================================
- Coverage 94.29% 94.26% -0.04%
==========================================
Files 74 75 +1
Lines 15696 15942 +246
==========================================
+ Hits 14801 15028 +227
- Misses 895 914 +19
Continue to review full report at Codecov.
|
Using issue #1740 I found a bug in this fix:
If UNSUBSCRIBE response is received before the PING response received: in this case get_message will poll the unsubscribe response, then it will change the subscribed_event flag to false, and then running get_message again will result in None until a new subscription is made. However, in this phase, if we call subscribe again, we have the b"redis-py-health-check" response queued to the socket, so, when we'll run the health check within the execute_command (since now self.subscribed == False), an error will occur: I will work on publishing a fix for both. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the overall approach looks safe now, and I'm unable to crash it with my tests. I've made a suggestion that will fix a new timing-dependent crash.
It is unfortunate that the sequence [subscribe, get_message*, unsubscribe, sleep, subscribe] will no longer benefit from a health check on the later subscribe. In the application where I run into these issues, I have a wrapper class that keeps one permanent PubSub object around and uses it when it wants to wait for some message to be published.
Added a new solution so we could run more than one health check from execute_command. clean_health_check_responses will be called from exeute_command, before sending the command or initiating a health check, only If (not self.subscribed).
I couldn't find a scenario in which we'll clean a response that isn't a health check response. However, I added exception throwing in case it did happen, to make debugging easier. |
@bmerry, @chayim I would appreciate a look if you have time. |
…d clear on unsubscription
…m the execute_command method only in the first command execution.
…ot the connection isn't subscribed
0e8b7ee
to
d0a95fa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the design looks good now. Unfortunately I need to get my own work all sorted out before I go on holiday later this week, so I won't have time to thoroughly test this or to do further reviewing.
# Set the subscribed_event flag to True | ||
self.subscribed_event.set() | ||
# Clear the health check counter | ||
self.health_check_response_counter = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be done at the end of clean_health_check_response
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to put it there because the following scenario is possible:
- p.subscribe("foo")
- health check is performed
- p.unsubscribe("foo")
- a health check response still hasn't received
- p.unsuscribe("foo")
- clean_health_check_response is being called by the unsubscribe command, the health check response hasn't arrived yet and it exists the loop due to ttl runs-out
- the health check response only now received
- p.subscribe() is being called - self.subscribed is still False so a health check will be performed and we should clean the existing health check response before we continue.
- If we add 'clean_health_check_response=0' at the end of clean_health_check, we will clean the counter in step 6, so we won't be able to clean the socket from the response on step 8.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the existing socket timeout throughout?
Signed-off-by: Andrew-Chen-Wang <[email protected]>
Pull Request check-list
Please make sure to review and check all of these items:
$ tox
pass with this change (including linting)?NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.
Description of change
closes #1720
closes #1740
closes #1733
Another implementation to #1720 (first impl: #1733)
In this PR i'm adding an option to call pubsub's method get_message() without subscribing first.
If get_message is called and no channel/pattern is subscribed, the method will return None without trying to read from the connection.
When timeout is passed and no channels are yet subscribed, the get_message() function will wait for the first to arrive - either a subscription has been made or the time has expired.