Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example of Robust multi-channel consumer and publisher #13

Closed
thejuan opened this issue Apr 21, 2016 · 6 comments
Closed

Example of Robust multi-channel consumer and publisher #13

thejuan opened this issue Apr 21, 2016 · 6 comments

Comments

@thejuan
Copy link

thejuan commented Apr 21, 2016

Whats the best way to reconnect when a channel errors?
Is it OK to call connection.open() to re-open a closed/errored connection? same for channel.open ?
Is it thread safe for multiple threads to try and re-open the connection on error?
Or do we need to pass around a connection wrapper that locks?

@eandersson
Copy link
Owner

eandersson commented Apr 21, 2016

It should technically be safe to re-open, but you will need to make sure that you only call connection.open when the connection is set to state CLOSED. If a connection is in state OPENING you need to make sure that the other threads are waiting.

def reconnect(self):
    self.retries += 1
    if self.connection.is_closed:
        self.connection.open()
    sleep(min(self.retries, 30))
    if not self.connection.is_open:
        self.reconnect()
        return
    self.retries = 0

Ideally though you would have a master thread that handles the connection, and the rest only handles the channels. If the connection goes down, the master thread re-connects, and the channels automatically re-open once the connection is established again.

I'll see if I can add a robust multi-threaded example.

@eandersson
Copy link
Owner

A fairly straight forward example with 5 threaded consumers would look something like this.

import logging
import time
import threading

import amqpstorm
from amqpstorm import Connection

logging.basicConfig(level=logging.DEBUG)

LOGGER = logging.getLogger()


class Consumer(object):
    def __init__(self, max_retries=None):
        self.max_retries = max_retries
        self.connection = None

    def create_connection(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                self.connection = Connection('127.0.0.1', 'guest', 'guest')
                break
            except amqpstorm.AMQPError as why:
                LOGGER.warning(why)
                if self.max_retries and attempts > self.max_retries:
                    break
                time.sleep(min(attempts * 2, 30))
            except KeyboardInterrupt:
                break

    def start_consumers(self):
        if not self.connection:
            self.create_connection()

        while True:
            for index in range(5):
                thread = threading.Thread(target=self.create_consumer,
                                          name='Consumer #%d' % index)
                thread.daemon = True
                thread.start()

            while True:
                try:
                    self.connection.check_for_errors()
                except amqpstorm.AMQPError:
                    self.create_connection()
                time.sleep(1)

    def create_consumer(self):
        try:
            channel = self.connection.channel()
            channel.queue.declare('simple_queue')
            channel.basic.consume(self, 'simple_queue', no_ack=False)
            channel.start_consuming(to_tuple=False)
            if not channel.consumer_tags:
                channel.close()
        except amqpstorm.AMQPError:
            pass
        except KeyboardInterrupt:
            pass

    def __call__(self, message):
        print("Message:", message.body, threading.current_thread())
        message.ack()


if __name__ == '__main__':
    CONSUMER = Consumer()
    CONSUMER.start_consumers()

@eandersson
Copy link
Owner

I added an example that would be my recommended method of handling multiple threaded consumers.
https://github.com/eandersson/amqpstorm/blob/master/examples/scalable_consumer.py

@thejuan
Copy link
Author

thejuan commented Apr 22, 2016

Thanks. Looks good.
Why in all your examples do you do this?

if not self.channel.consumer_tags:
    self.channel.close()

@eandersson
Copy link
Owner

That my friend is over engineering from my part.

You can have multiple consumers on a single channel, and that piece of code checks if there is another thread consuming, if no other thread is consuming on that channel, it simply closes the channel.

@thejuan
Copy link
Author

thejuan commented Apr 22, 2016

Awesome. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants