Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing read race condition during pubsub #1737

Merged
merged 11 commits into from
Dec 23, 2021
Merged

Conversation

barshaul
Copy link
Contributor

@barshaul barshaul commented Nov 22, 2021

Pull Request check-list

Please make sure to review and check all of these items:

  • [X ] Does $ tox pass with this change (including linting)?
  • [ X] Do the CI tests pass with this change (enable it first in your forked repo and wait for the github action build to finish)?
  • [X ] Is the new or changed code fully tested?
  • [ X] Is a documentation update included (if this change modifies existing APIs, or introduces new ones)?

NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.

Description of change

closes #1720
closes #1740
closes #1733

Another implementation to #1720 (first impl: #1733)
In this PR i'm adding an option to call pubsub's method get_message() without subscribing first.
If get_message is called and no channel/pattern is subscribed, the method will return None without trying to read from the connection.
When timeout is passed and no channels are yet subscribed, the get_message() function will wait for the first to arrive - either a subscription has been made or the time has expired.

@bmerry
Copy link
Contributor

bmerry commented Nov 22, 2021

Being able to start polling without making a subscription would be nice - I ran into this limitation recently (although in aioredis).

An immediate problem I can see with this is that get_message can take double the timeout if it first waits just less than timeout for a subscription then another timeout for a message (plus another 0.25s because of the polling in wait_for_subscription).

I feel like this probably still has a race somewhere, because by the time wait_for_subscription returns, the main thread might already have asked to unsubscribe. It may be that works as long as some timing assumptions hold (e.g. round-trip time to the server is less than the health check interval) but breaks if the server suffers high latency. I'll poke at it some more and see if I can produce an explicit example.

@bmerry
Copy link
Contributor

bmerry commented Nov 22, 2021

Looking at it again, I think the race I was worried about can't happen - provided that there is just one thread calling (un)subscription functions ("main thread") and one thread using get_message/listen (poller thread). Here's my logic, in case anyone wants to double-check. I consider three possible states:

  1. Subscribed: there are subscriptions. self.subscribed is true.
  2. Semi-subscribed: we've issued UNSUBSCRIBE commands for all subscriptions, but not yet processed the responses. self.subscribed is true.
  3. Unsubscribed: we've issued UNSUBSCRIBE commands for all subscriptions and processed the responses. self.subscribed is false.

When get_message passes the wait_for_subscribedcheck, we're in either subscribed or semi-subscribed state. The main thread can trigger oscillations between these states, but cannot cause a transition to unsubscribed on its own (the poller thread does that inhandle_messageby processing the unsubscription response). So whenget_messageis reading the socket, we are guaranteed not to be in unsubscribed state. On the other hand,execute_command` only runs the health check in unsubscribed state, and remains in that state for the duration of the health check.

I think there might still be a bug where PubSub.check_health can run in semi-subscribed state, but I'll file that separately if I manage to reproduce it.

redis/client.py Outdated Show resolved Hide resolved
redis/client.py Show resolved Hide resolved
redis/client.py Outdated Show resolved Hide resolved
redis/client.py Outdated Show resolved Hide resolved
@codecov-commenter
Copy link

codecov-commenter commented Nov 23, 2021

Codecov Report

Merging #1737 (3cf820f) into master (748c8d1) will decrease coverage by 0.03%.
The diff coverage is 92.06%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1737      +/-   ##
==========================================
- Coverage   94.29%   94.26%   -0.04%     
==========================================
  Files          74       75       +1     
  Lines       15696    15942     +246     
==========================================
+ Hits        14801    15028     +227     
- Misses        895      914      +19     
Impacted Files Coverage Δ
redis/client.py 89.46% <85.29%> (-0.38%) ⬇️
tests/test_pubsub.py 99.75% <100.00%> (+0.01%) ⬆️
redis/__init__.py 90.47% <0.00%> (-9.53%) ⬇️
redis/commands/json/commands.py 88.88% <0.00%> (-6.07%) ⬇️
tests/conftest.py 90.14% <0.00%> (-2.75%) ⬇️
redis/commands/core.py 89.92% <0.00%> (-0.10%) ⬇️
redis/cluster.py 90.23% <0.00%> (-0.07%) ⬇️
setup.py 0.00% <0.00%> (ø)
tests/test_json.py 100.00% <0.00%> (ø)
... and 7 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 748c8d1...3cf820f. Read the comment docs.

redis/client.py Outdated Show resolved Hide resolved
@barshaul
Copy link
Contributor Author

Using issue #1740 I found a bug in this fix:

#!/usr/bin/env python3

import threading
import time

from redis import Redis


def poll(ps, event=None):
    print(ps.get_message(timeout=5))
    event.wait()
    while True:
        message = ps.get_message(timeout=5)
        if message is not None:
            print(message)
        else:
            break

def main():
    r = Redis.from_url("redis://localhost", health_check_interval=1)
    ps = r.pubsub()
    ps.subscribe("foo")

    event = threading.Event()
    poller = threading.Thread(target=poll, args=(ps, event))
    poller.start()

    time.sleep(2)
    event.set()
    ps.unsubscribe("foo")
    time.sleep(1)
    ps.subscribe("foo")
    poller.join()

while True:
    main()

If UNSUBSCRIBE response is received before the PING response received: in this case get_message will poll the unsubscribe response, then it will change the subscribed_event flag to false, and then running get_message again will result in None until a new subscription is made. However, in this phase, if we call subscribe again, we have the b"redis-py-health-check" response queued to the socket, so, when we'll run the health check within the execute_command (since now self.subscribed == False), an error will occur:
redis.exceptions.ConnectionError: Bad response from PING health check
because the response is b"redis-py-health-check" .
This issue can be fixed if we'll run the health check from the execute_command method only on the first command execution.

I will work on publishing a fix for both.

@barshaul
Copy link
Contributor Author

@bmerry A fix was added for #1740 and to the bug I mentioned in the comment above.

Copy link
Contributor

@bmerry bmerry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the overall approach looks safe now, and I'm unable to crash it with my tests. I've made a suggestion that will fix a new timing-dependent crash.

It is unfortunate that the sequence [subscribe, get_message*, unsubscribe, sleep, subscribe] will no longer benefit from a health check on the later subscribe. In the application where I run into these issues, I have a wrapper class that keeps one permanent PubSub object around and uses it when it wants to wait for some message to be published.

redis/client.py Show resolved Hide resolved
redis/client.py Outdated Show resolved Hide resolved
redis/client.py Outdated Show resolved Hide resolved
@barshaul
Copy link
Contributor Author

barshaul commented Nov 30, 2021

Added a new solution so we could run more than one health check from execute_command.

clean_health_check_responses will be called from exeute_command, before sending the command or initiating a health check, only If (not self.subscribed).
not self.subscribed can be true in two cases:

  1. The first time we subscribe, then we know for sure that the socket is clean and clean_health_check_responses will immediately return
  2. In case we subscribe->unsubscribe->subscribe. In this case, just after 'unsubscribed' was executed, there are two options:
    • get_messages() didn't execute a health check before pulling the 'unsubscribe' response. it only had the 'unsubscribe' response to pull, then it set is_subscribed to False. If 'subscribe' is being called before the 'unsubscribe' response was processed, then is_subscribe will be True, so execute_command will not try to clean the socket. If 'subscribe' is being called after the is_subscribed was set to False, execute_command will call clean_health_check_responses and immediately return - since the socket is empty (get_messages() haven't done a health check). Thus execute_command can perform a health check with no problems.
    • get_messages() did execute a health check before pulling the 'unsubscribe' response. In this case we have two options for the order of the responses in the socket:
      1. the health check response is queued before the unsubscribe response. In this case, the health check response will be processed first and the PubSub.parse_response method will ignore it, then it will retrieve the unsubscribe response. 'subscribe' can now execute a health check with no issue.
      2. the unsubscribe response queued before the health check response. then the get_message() will set is_subscribed event to False and stop pulling messages. so we will have the health check response left unread in the socket. In this stage, if subscribe is executed - self.subscribed is False, and therefore clean_health_check_responses will pull the health_check response and return. Then a health check can be issued by execute_command.

I couldn't find a scenario in which we'll clean a response that isn't a health check response. However, I added exception throwing in case it did happen, to make debugging easier.
@bmerry, Please verify me and let me know what you think.

@barshaul
Copy link
Contributor Author

barshaul commented Dec 2, 2021

Added a new solution so we could run more than one health check from execute_command.

clean_health_check_responses will be called from exeute_command, before sending the command or initiating a health check, only If (not self.subscribed). not self.subscribed can be true in two cases:

  1. The first time we subscribe, then we know for sure that the socket is clean and clean_health_check_responses will immediately return

  2. In case we subscribe->unsubscribe->subscribe. In this case, just after 'unsubscribed' was executed, there are two options:

    • get_messages() didn't execute a health check before pulling the 'unsubscribe' response. it only had the 'unsubscribe' response to pull, then it set is_subscribed to False. If 'subscribe' is being called before the 'unsubscribe' response was processed, then is_subscribe will be True, so execute_command will not try to clean the socket. If 'subscribe' is being called after the is_subscribed was set to False, execute_command will call clean_health_check_responses and immediately return - since the socket is empty (get_messages() haven't done a health check). Thus execute_command can perform a health check with no problems.

    • get_messages() did execute a health check before pulling the 'unsubscribe' response. In this case we have two options for the order of the responses in the socket:

      1. the health check response is queued before the unsubscribe response. In this case, the health check response will be processed first and the PubSub.parse_response method will ignore it, then it will retrieve the unsubscribe response. 'subscribe' can now execute a health check with no issue.
      2. the unsubscribe response queued before the health check response. then the get_message() will set is_subscribed event to False and stop pulling messages. so we will have the health check response left unread in the socket. In this stage, if subscribe is executed - self.subscribed is False, and therefore clean_health_check_responses will pull the health_check response and return. Then a health check can be issued by execute_command.

I couldn't find a scenario in which we'll clean a response that isn't a health check response. However, I added exception throwing in case it did happen, to make debugging easier. @bmerry, Please verify me and let me know what you think.

@bmerry, @chayim I would appreciate a look if you have time.

@bmerry
Copy link
Contributor

bmerry commented Dec 6, 2021

@bmerry, @chayim I would appreciate a look if you have time.

I'll take a look now.

redis/client.py Outdated Show resolved Hide resolved
Copy link
Contributor

@bmerry bmerry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the design looks good now. Unfortunately I need to get my own work all sorted out before I go on holiday later this week, so I won't have time to thoroughly test this or to do further reviewing.

redis/client.py Outdated Show resolved Hide resolved
# Set the subscribed_event flag to True
self.subscribed_event.set()
# Clear the health check counter
self.health_check_response_counter = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be done at the end of clean_health_check_response?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to put it there because the following scenario is possible:

  1. p.subscribe("foo")
  2. health check is performed
  3. p.unsubscribe("foo")
  4. a health check response still hasn't received
  5. p.unsuscribe("foo")
  6. clean_health_check_response is being called by the unsubscribe command, the health check response hasn't arrived yet and it exists the loop due to ttl runs-out
  7. the health check response only now received
  8. p.subscribe() is being called - self.subscribed is still False so a health check will be performed and we should clean the existing health check response before we continue.
  9. If we add 'clean_health_check_response=0' at the end of clean_health_check, we will clean the counter in step 6, so we won't be able to clean the socket from the response on step 8.

Copy link
Contributor

@chayim chayim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the existing socket timeout throughout?

@chayim chayim added the bug Bug label Dec 23, 2021
@chayim chayim changed the title Resolving read race condition between pubsub's get_message() and execute_command() Fixing read race condition during pubsub Dec 23, 2021
@chayim chayim merged commit d6cb997 into redis:master Dec 23, 2021
@chayim chayim deleted the health_check_new branch December 23, 2021 09:42
Andrew-Chen-Wang added a commit to aio-libs-abandoned/aioredis-py that referenced this pull request Dec 24, 2021
Signed-off-by: Andrew-Chen-Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Another race condition in health checks and pubsub Race condition in handling health checks for pub-sub
4 participants