Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition for the Socket.io background thread like in example code #493

Closed
mrwillis opened this issue Jun 29, 2017 · 2 comments
Closed
Assignees
Labels

Comments

@mrwillis
Copy link

mrwillis commented Jun 29, 2017

Hi all,

I was looking at the example for starting the background thread and doing some operation every n seconds and emitting to the client (in the example folder). I've implemented it but I'm having some trouble with what looks like some sort of threading issue. The issue I'm having is that multiple background threads are spawned on the @socketio.on('connect') method below. Here's what I think is happening:

  1. Client A connects to the server
  2. Some time later, client B connects to the server

Each client is having no issues and getting data every n seconds, as specified.

  1. Now imagine gunicorn is restarted.
  2. Both Client A and Client B get a thread from gunicorn and they both try to run the @socketio.on('connect') method, which creates the background thread. But now there's two background threads emitting the same message every n seconds!

How can I fix this? I've put some snippets below of the relevant sections. I was thinking that I spawn the thread once the server starts, but I don't know if that would work if no socket.io clients exists...

background_marketdata_thread = None
rooms = list()


def background_marketdata_broadcast():
    """
    Emits market data every 5 seconds for every active ETF
    :return:
    """
    while True:
        socketio.sleep(5)
        all_market_data = queries.get_multiple_market_data(rooms)
        app.logger.warning('I am a thread!')
        if len(all_market_data.keys()) != 0:
            for etf in rooms:
                try:
                    if etf in all_market_data:
                        socketio.emit('mkt_data', all_market_data[etf], room=etf)
                except Exception as e:
                    app.logger.exception(e)


@utils.authenticated_only
@socketio.on('join')
def join(data):
    """
    Joins a room to subscribe to data for that ETF in the dashboard
    :param data:
    :return:
    """
    global rooms
    if data['etf'] not in rooms:
        rooms.append(data['etf'])
    join_room(data['etf'])


@utils.authenticated_only
@socketio.on('connect')
def emit_market_data():
    global background_marketdata_thread
    if background_marketdata_thread is None:
        background_marketdata_thread = socketio.start_background_task(target=background_marketdata_broadcast)

And my logs (note the timestamps!) ...

2017-06-29 19:00:19,303 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:19,650 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:19,658 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:24,332 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:24,727 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:24,744 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:29,352 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:29,748 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:29,766 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:34,373 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:34,764 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:34,779 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:39,387 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:39,781 - basezero_flask - WARNING - I am a thread!
2017-06-29 19:00:39,796 - basezero_flask - WARNING - I am a thread!

My versions:

eventlet~=0.21.0
flask-socketio~=2.8.6
gunicorn~=19.7.1
Flask~=0.12.1

I'm also running behind nginx as a reverse proxy but just serving static files. And my Gunicorn conf:

workers = 1
worker_class = 'eventlet'
error_logfile = '/vagrant/server/gunicorn_error_log.log'
access_logfile = '/vagrant/server/gunicorn_access_log.log'
daemon = True

Thanks for your help :)

@miguelgrinberg
Copy link
Owner

Yes, I have actually been sloppy about this, because it is meant to be an example only.

The correct way to do this is to use a Lock to create a critical section. This ensures that if two clients are running the connect handler at the same time, only one will start the thread.

Here is a quick example, I will update the application as well:

from threading import Lock

background_marketdata_thread = None
background_marketdata_lock = Lock()

@utils.authenticated_only
@socketio.on('connect')
def emit_market_data():
    global background_marketdata_thread
    
    with background_marketdata_lock:
        if background_marketdata_thread is None:
            background_marketdata_thread = socketio.start_background_task(target=background_marketdata_broadcast)

@miguelgrinberg miguelgrinberg self-assigned this Jun 29, 2017
@mrwillis
Copy link
Author

Thanks for the quick reply @miguelgrinberg. Works like a charm.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants