Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully stopping consumers? #517

Open
lcarva opened this issue Aug 18, 2018 · 4 comments
Open

Gracefully stopping consumers? #517

lcarva opened this issue Aug 18, 2018 · 4 comments

Comments

@lcarva
Copy link

lcarva commented Aug 18, 2018

I have a consumer that takes about 20 minutes to process a received message. During this period the consumer should not be stopped. If I need to deploy a new version of the consumer, for example, things become challenging.

I'd like a way to signal fedmsg to stop consuming new messages; wait for consumers to finish processing current messages; then finally stop fedmsg. Ideally a configurable max wait timeout would control this.

Before I delve into the code base, does this approach make sense?

@ralphbean
Copy link
Contributor

@lcarva, it makes sense to me. @mprahl has wanted something similar for MBS (a maintenance mode to drain the system), so you may want to talk to him as well.

I figure your patch will need to touch moksha.hub primarily but may have to reach into fedmsg too.

@jeremycline
Copy link
Member

The way we've handled this in fedora-messaging is to register signal handlers. I'd recommend that approach. Handle the signal by setting a "stop" flag, and if that flag is already set (e.g. the signal got sent twice) stop immediately.

@lcarva
Copy link
Author

lcarva commented Aug 20, 2018

@jeremycline, ah that's clever because I can do that on my consumer directly (I think) without having to mess with fedmsg/moksha. Thanks, I'll look into that.

@lcarva
Copy link
Author

lcarva commented Aug 28, 2018

The way we've handled this in fedora-messaging is to register signal handlers. I'd recommend that approach. Handle the signal by setting a "stop" flag, and if that flag is already set (e.g. the signal got sent twice) stop immediately.

I think I'm getting closer to this. I've added a signal handler like this in my consumer:

class MyConsumer(fedmsg.consumers.FedmsgConsumer):
  def __init__(self, hub):
    ...
    signal.signal(signal.SIGTERM, self._on_sigterm)

    def _on_sigterm(self, signum, frame):
        log.info('Received signal %d', signum)
        if signum != signal.SIGTERM:
            log.warning('Signal is unexpected, doing nothing')
            return

        # Do not consume more messages from UMB
        self.hub.unsubscribe(self._consume)

        # Finish up what's already in queue
        while not self.incoming.empty():
            # TODO: Break if configured timeout expires
            time.sleep(1)

        # TODO: Ensure that there are no active queue items being processed - not sure what to check

        # Disconnect from hub and do some cleanup (Yikes! this is ugly)
        self.stop()
        self.hub.stop()
        from moksha.hub.reactor import reactor
        reactor.stop()

After calling reactor.stop() it seems that any process waits for any currently running threads to finish executing before quitting which is pretty close to what I want.

The only missing piece is checking for running threads before killing hub/reactor. The reason being that I'd like to publish a message at the end of my consume method. If I'm killing hub/reactor, no messages will be published. I'll keep digging to see if there's a way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants