Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add auto_ack mode, so it can be turned off to give application contro… #554

Closed
wants to merge 5 commits into from

Conversation

petersilva
Copy link
Contributor

This allows applications to control when message acknowledgements are sent. It is modelled on the same behaviour in
amqplib ( https://www.rabbitmq.com/confirms.html ) where the default behaviour has the library automatically acknowledge
messages when they are read by the application. Turning auto_ack off, allows the application to call ack with a message-id
to perform explicit acknowledgements when appropriate.

There are whole classes of applications that cannot use MQTT because of the lack of control over acknowledgements.
For reliablity reasons, some apps need to only acknowledge messages once processing on the recipient node is completed.
Otherwise all data received by a client, but not yet processed, is lost in the event of node failure.

The MQTT protocol itself is fine, It's just the library not exposing the option of user controlled acknowledgements. My understanding from reading this: #348 I gather the Java paho libraries already expose similar functionality. I think this method is cleaner, and is based on an existing analogous function in a comparable library.

@petersilva
Copy link
Contributor Author

The build travis failures are in areas of the code with nothing to do with my changes. Pre-existing issue.
I think I have signed the appropriate contributor agreement.

@ralight
Copy link
Contributor

ralight commented Jul 7, 2021

Hi @petersilva, sorry it's taken so long to get round to looking at this.

I think the idea is fine. I would also like it extended to include QoS 2, it feels incomplete otherwise. The point at which it should intercept is on receiving a PUBREL. The call to ack can cover both cases, we could check whether the mid is in the list of waiting QoS 2 messages and if so send a PUBCOMP, otherwise send a PUBACK. What do you think?

The commit is still coming up as not matching to the ECA - the email in the commit needs to match the address that you signed with. You can update and force push against this PR or make a new one if you would prefer.

@BBBSnowball
Copy link

I would also like it extended to include QoS 2, it feels incomplete otherwise.

That may be useful and PUBCOMP indeed feels like the PUBACK for QoS 2 (for Method A of MQTT 3.1.1, at least). However, I feel that it is less necessary than for QoS 1: The sender has sent the PUBREL message so it mustn't send the PUBLISH again (doing otherwise would violate the "at most once" guarantee). It could attempt a retry on the application layer but if it can do so without violating "at most once" why did it use QoS 2 at all?

MQTT 3.1.1 says that the receiver doesn't have to deliver the message before sending PUBREC/PUBCOMP. MQTT 5 adds that it should check certain things ("quota exceeded, authorization, etc."). This might be something that is better handled by the application. However, this would be better done before PUBREC and indeed MQTT 5 doesn't mention "Method A" anymore (i.e. call the handler after receiving PUBREL, which we do). "Method B" calls the handler before sending PUBREC but needs persistent storage so I guess we don't want to open that can of worms for this rather simple PR (or at all).

Nonetheless, we can copy&paste the changes to QoS 2 and let the application decide. If it doesn't want to deal with it, it can call ack() at the start of the handler and be done with it. This is what you are suggesting, right?

@petersilva I'd suggest some additions for the documentation (see my comment on the issue for a longer explanation and reference to the spec):

  1. "auto_ack=False allows the application to acknowledge" -> I would use stronger wording. The application has to absolutely make sure to either call ack() eventually or reconnect. In the latter case, it should expect to get the same message again so this is only an option if the error is likely to be temporary (e.g. database is restarting) or will affect all further messages (e.g. database is dead). If the application fails to do this, it is likely to process the same message over and over or exhaust the in-flight window and stall forever.
  2. The ack() calls have to be in the same order as the incoming messages.
  3. The application may receive messages that it doesn't expect and that it didn't even subscribe to. It must acknowledge them anyway. In practice, this means that the application has to set an on_message callback. We could add some code to the library to make this easier but I don't think we have to. After all, this is already so much easier than the other two requirements.

@petersilva
Copy link
Contributor Author

@BBBSnowball

about 1. OK I can clarify... Yes the language should be firmer... something like "the application must explicitly call the ack function for each message received." I would prefer to clarify the other two points before submitting an updated patch.

about 2. ... I don't think the acks have to be in the same order, as each message includes a message ID.
If you look in the patch, you will see that the ack member function uses the message ID to build the message sent back to the broker. Forcing the client to acknowledge in order is a significant ask... would not want to do that unless we really need to.

about 3. Huh? I have never seen an application receive messages it didn't subscribe to, that seems like an error... My understanding would be that that will never happen. Can you elaborate with an example?

@petersilva
Copy link
Contributor Author

@ralight I re-signed the ECA, and it seems happy now.

I started looking at how to do what you are asking for. The original patch was pretty straight-forward. I just looked at where the ack's were in the original code an put an if self.auto_ack barrier in front of them. I don't yet understand how to combine the two QoS's because qos==1 should only get puback, and never get a PUBREL and qos==2 should be the reverse from my reading. I imagine I need to add code to deal with qos==2 in handle_pubrel... and it should not affect qos==1. Are you asking me to combine the handlers for PUBACK and PUBREL?

Also, there is a timing thing... if the app is quick, and the link is slow, the app might issue the ack before the pubrec is even received... so I think there is additional state required to store 'ack_from_app' in the structure and emit PUBCOMP after both PUBREL and application ack are received. qos==2 is more complicated, and I think testing would be separate, so it is likely better as two separate patches. (higher probability of bugs in qos==2, while qos==1 is much easier.)

@petersilva
Copy link
Contributor Author

oh... and thanks for reading the patch! appreciated!

@BBBSnowball
Copy link

The simple change would be this (and I think @ralight was also going for that):

  1. Skip the call to _send_pubcomp in _handle_pubrel if auto_ack is disabled (same as for _send_puback in the existing patch).
  2. The handler will be called by _handle_pubrel. That handler will call ack (same as for QoS 1, again).
  3. ack looks up whether this is a QoS 1 or 2 message and sends PUBACK or PUBCOMP.

_handle_pubrel is removing the message from self._in_messages before calling the handler and I think this is essential to guarantee "at most once". Therefore, I don't think ack can use this to detect the type of QoS. Unless there is another simple way to detect this, we can add a dedicated ack method for QoS 2 and the handler is expected to call the right one. Alternatively, we could pass the complete message (rather than the mid only) to ack and use its qos field.

For the documentation updates: Yes, I agree that these requirements are weird and seem irrelevant. I'll probably ignore them if I know that the clients and broker don't care. However, the MQTT spec wants it that way (unless I'm misunderstanding something). I don't expect this to be an issue in practice. That's why I suggest that we document this and let the application deal with it if (and they may opt to ignore it).

Maybe something like this: The MQTT specification mandates that we send acks for all messages with QoS 1 or 2 (even ones that we didn't expect) and that we do so in the same order that we received the messages. It is up to the application to ensure this if this is relevant in your environment.

@ralight
Copy link
Contributor

ralight commented Jul 21, 2021

@petersilva The description by @BBBSnowball is correct - the delayed ack for a QoS 2 message would have to be on receiving a PUBREL and before sending a PUBCOMP, which makes it very similar to the QoS 1 case really, just a difference in which message gets sent out.

@petersilva
Copy link
Contributor Author

After prodding from the two of you, I just looked hard and found section 4.6 message ordering... so that means if the application returns acks out of order, they need to be queued until earlier messages are also acked, so my initial patch, even though it worked in practice, is wrong.

so... now when we ack, we have to iterate forward to find succeeding messages that have also been acked, and generate potentially multiple ack messages. the ReceiveMaximum also might have to be considered... If there are too many messages in flight, should an exception be raised or a log message written?

OK I have something to work with... It will take a few weeks.

@BBBSnowball
Copy link

so my initial patch, even though it worked in practice, is wrong.

I would argue that your patch is good. There is nothing that says that the library has to provide those guarantees and the application is in a better place to achieve this (if they handle callbacks in order - as many simple applications will - all is good; if not, they should already have machinery for ordering things) or know that it isn't necessary (many clients and brokers won't care - as you say, it works well in practice). If we end up with a queue of outgoing acks, it should have similar bounds as other queues in the application - again, better done by the application.

However, if we don't handle that part of MQTT, we should says so in the docstrings. That's the only change that I would do.

Alternatives:

  1. Make it an error to call ack in the wrong order. This takes some choice away from the application ("but my broker doesn't care") but would be relatively easy on our part.
  2. Auto-ack preceding messages so we can send the ack without breaking protocol. This would break the strong ack guarantees so I don't think this is a good idea.
  3. Buffer outgoing acks until all previous messages have been ack'ed. More complex to do but should work well except for the edge cases, that you have mentioned. Also, if the application forgets to ack a message, they will have a memory leak and won't send any further acks. Maybe this is a good thing because it will blow up in their faces - hopefully in testing, not in production ;-)

Maybe we should apply your patch as is (with some additions to the docstrings) and handle the more complex things if they turn out to be relevant in practice? Maybe change auto_ack from a bool to auto_mode with the values auto and app_acks_in_order so we can later add more complex modes?

@dYalib
Copy link

dYalib commented Sep 22, 2021

Great work! This is exactly the feature i am currently looking for. Can I help to get the feature with in the upstream version?

I am in favor of the way suggested by @BBBSnowball, first to install the basic functionality and then the complex things later, when they are really needed. Presumably, these are only a few special cases.

As @BBBSnowball already suggested, I would also change the parameter to type string.
As name I suggest "ack_mode" with values "auto" (default) and "app_controlled". Then there would be room for more (complex) variants. What do you think?

@petersilva
Copy link
Contributor Author

I´m fine with the suggestions and can look at implementing what is requested. People should just appreciate that the small fix I did was really simple, and didn´t require nearly the understanding of the code that the suggestions entail. I had a boolean that disabled sending the acks from the library, that´s it. This is a lot more involved. The patch will be a lot bigger, and completely different.

@petersilva
Copy link
Contributor Author

I actually think the requirement to acknowledge messages in order is an error in the specification. If I would do a patch... I would want to add some option, where by default, it would warn about acknowledgement out of order, and in the option would be to suppress the warning entirely. forcing acknowledgements to be in order is just wrong... and eventually someone will figure that out, and fix the spec.

@petersilva
Copy link
Contributor Author

note that the spec says "Server MUST treat every Topic as an Ordered Topic when it is forwarding messages on Non‑shared Subscriptions" .... All of the subscriptions I will be using are shared... we have not discussed what to do in the shared case, where strict ordering is non-sensical. My objection to the strict ordering in general is that I expected MQTT to work over UDP or other transports that do not prevent re-ordering in transit, but I see the MQTT spec specifically requires TCP/IP... which is a bit of a surprise to me... in that case ordering restriction can make sense in the non-shared case... it prevents using those other networks though... seems like a high price to pay.

@BBBSnowball
Copy link

People should just appreciate that the small fix I did was really simple

Oh, we do. I like it very much and want it to go in. That's why I did the code review :-)

During that review, I noticed the unfortunate requirements in the spec [1]. I still think that we should fix this by documentation instead of making the code more complex. I don't expect you to write the complex code that would be required to make things trivial for the application. In fact, I don't expect anyone to write that code unless it turns out that many applications need it.

I'd be happy with some simple doc changes and maybe change the bool to a string so someone else can add the more complex parts if they want to.

The other open point would be whether we want a similar feature for QoS 2. I think we can add this later if required and I don't think you should have to do the work for that if you don't need it. I see some value in adding it now if the change is simple. It will be simple in line count (most likely) but it is more involved regarding the interaction of datastructures and calls. Furthermore, I don't think we have any actual application that would use manual ack for QoS 2 so we don't know whether any API that we choose would be useful. For me, that's a good reason to defer it to another PR. I might be somewhat biased here because I'm only interested in QoS 1 ;-)

[1]: I agree that the requirement for ordered acks is somewhat surprising and unfortunate. However, I started with AMQP so my intuition about how it should be might very well be quite different from what makes sense for MQTT.

@dYalib
Copy link

dYalib commented Oct 21, 2021

@petersilva i have created a [PR] (MetPX#1) for you in which the parameter is a string and also the hint about the acknowledge order is included. Hope this meets the suggestions of @BBBSnowball and helps to finalize this PR ;-)

Thank you for your work!

@petersilva
Copy link
Contributor Author

petersilva commented Oct 23, 2021

yeah... after some thought, so to add this ordered mode I think puts too much on the client library. While the standard does say that acks should be in order, mosquitto, at least, does not seem to care. The standard also permits out of order acks in shared subscriptions, so brokers are likely not to care in practice. If there is some pedantic broker somewhere that wants to enforce the ordering as per the standard, it can always complain when receiving an ack out of order. There are mechanisms to return error codes across the wire.

for the client to detect ordering... it has to know whether the subscription is shared or not. The client currently does not have any code based on shared/not shared. It seems inelegant to add (parsing the topic strings?) for the client library to enforce ordering appropriately. so I think the broker is the one who should complain "I got an ack out of order" and, in practice they don't.

I think the boolean choice, as originally submitted is the "Right Thing" (tm) ... the based branch has changed since and the patch no longer applies, but I could do it again. Do people agree?

…n control over when acknowledgements are sent."

This reverts commit 6225aff.
…l over when acknowledgements are sent.

making comments more explicit about the application being required to acknowledge messages.
@dYalib
Copy link

dYalib commented Oct 25, 2021

yeah... after some thought, so to add this ordered mode I think puts too much on the client library. While the standard does say that acks should be in order, mosquitto, at least, does not seem to care. The standard also permits out of order acks in shared subscriptions, so brokers are likely not to care in practice. If there is some pedantic broker somewhere that wants to enforce the ordering as per the standard, it can always complain when receiving an ack out of order. There are mechanisms to return error codes across the wire.

I agree with you in principle. The current implementation is sufficient for me as well and so far it works as it should. So far, however, only in a small test environment without shared topics.

I think the boolean choice, as originally submitted is the "Right Thing" (tm) ... the based branch has changed since and the patch no longer applies, but I could do it again. Do people agree?

I think the discussion is basically about the decision:

  • boolean value: Sufficient at the moment, but should another option be added at some point, it is a breaking change.
  • string value: Would currently also have only 2 values. Finding a name for the values is difficult. Option for further values available, but could possibly never be needed.

Difficult to make a decision. Maybe a maintainer can contribute his experience?

@petersilva
Copy link
Contributor Author

fwiw, as hinted in initial post of patch, in the paho.mqtt.java library, they have setManualAcks(boolean ManualAcks):


 /**
  * If manualAcks is set to true, then on completion of the messageArrived callback
  * the MQTT acknowledgements are not sent.  You must call messageArrivedComplete
  * to send those acknowledgements.  This allows finer control over when the acks are
  * sent.  The default behaviour, when manualAcks is false, is to send the MQTT
  * acknowledgements automatically at the successful completion of the messageArrived
  * callback method.
  * @param manualAcks if set to true, MQTT acknowledgements are not sent.
  */
  public void setManualAcks(boolean manualAcks);

So boolean was deemed suffficient there.

@dYalib
Copy link

dYalib commented Oct 27, 2021

fwiw, as hinted in initial post of patch, in the paho.mqtt.java library, they have setManualAcks(boolean ManualAcks)

Oh, yes that is for me a clear vote for boolean. Because then it is also consistent.

BTW: HiveMQ has a similar feature and also uses a boolean value as parameter. Of course the implementation there is completely different and also obviously much more complex. See their PR

In short, I now vote for boolean.

@petersilva
Copy link
Contributor Author

also point out that the latest patch has firmed up the language as requested:

set to False, the application MUST acknowledge messages using the ack(message.mid)
entry point

also:

Normally, when a message is received, the library automatically
acknowledges immediately. auto_ack=False requires instead that the
application acknowledge receipt after it has completed processing of a
message using a the ack() method.

@dYalib
Copy link

dYalib commented Apr 25, 2022

Hi, do you see any change to merge this feature? I still need this ;-)

@jellybob
Copy link

jellybob commented Feb 21, 2023

Giving this another bump, is there anything preventing a merge that I could help move along?

So that we can depend upon it for an internal application I've forked the repository this PR originated from, cleaned up the commit history a little, and reapplied on top of the latest master branch here: https://github.com/DemandLogic/paho.mqtt.python

@couling
Copy link

couling commented Jun 26, 2023

I may have missed something in this thread, but a few words have rung alarm bells for me:

I actually think the requirement to acknowledge messages in order is an error in the specification.

No it isn't! It's a direct result of the main requirement of having FIFO topic+QOS. If ACKs are sent out of order followed by a disconnect, the reconnect procedure would re-send messages with gaps, where the gaps happen because an ACK was sent out of sequence.

My objection to the strict ordering in general is that I expected MQTT to work over UDP or other transports that do not prevent re-ordering in transit

No MQTT itself is explicitly TCP or websockets which guarantee perfect order. There is also a separate standard for MQTT-SN for use over UDP. While you may wish to use MQTT in conjunction with MQTT-SN, breaking this feature of MQTT breaks the client for the wider user base. [Please dont!]

Breaking strict ordering breaks any capability it has of maintaining a current status of the IOT fleet of devices. It completely trashes the use cases for "retained messages" resulting in downstream message brokers retaining the wrong message which could persist for a very long time.

For the documentation updates: Yes, I agree that these requirements are weird and seem irrelevant. I'll probably ignore them if I know that the clients and broker don't care. However, the MQTT spec wants it that way (unless I'm misunderstanding something). I don't expect this to be an issue in practice. That's why I suggest that we document this and let the application deal with it if (and they may opt to ignore it).

That's a worrying definition to set. Some of the points of the MQTT specification become critically important to applications even if they are not important to the message broker. The broker is not concerned with semantics of the messages only relaying the messages, but the rules of MQTT have been set to allow application developers build a framework ontop of MQTT. Breaking these rules can break the capabilities of MQTT in general and merely documenting that it's broken isn't of much use to the application developers.

If this must be handled in documentation then there really needs to be documented work arounds on how to comply with the standard despite Paho's code.

@BBBSnowball
Copy link

If ACKs are sent out of order followed by a disconnect, the reconnect procedure would re-send messages with gaps [...]

That is good context to know. Thanks. I had assumed the requirement was more about making it easier for the broker to track ack state (with a counter rather than a list or bitmap of un'acked messages).

I assume @petersilva was thinking about a case with semantically unrelated messages that are only ordered for technical reasons (i.e. the TCP connection). For example, you may be receiving messages from two sensors on two topics: A1, B1, A2, B2. If you ack B1 before A1 and then reconnect, the broker will re-send A1 but probably not B1 - which breaks that order, as you pointed out. The standard doesn't allow this, as far as I understand. Nonetheless, it could have been specified in a way that allows it (but still disallowing all cases that can actually be inconsistent, of course). Thus, I regard this as a reasonable wish and it is merely off-topic in a discussion about a library (which must consider the actual standard, of course).

If this must be handled in documentation then there really needs to be documented work arounds on how to comply with the standard despite Paho's code.

I think you are misunderstanding what I was saying. No workarounds are needed and Paho's code won't be in opposition to the standard in any way.

First, the PR won't change anything unless you opt-in to the new behavior. The default way is to ack messages before they are processed, which complies with the letter of the standard (I think... I'm actually not sure) but it will drop a message despite QoS 1 if you have a power outage at the wrong time. The standard doesn't require Paho to handle this race condition but it certainly doesn't stop us from doing so.

There are several ways to comply with the order requirement. It is hard for a library to choose one. If we handle this in Paho, we are likely to do more work than is actually needed for a particular application. Therefore, the suggestion is to leave the decision to the application.

The application could get it wrong, e.g. ack twice, not at all or in the wrong order. We can catch some (but not all) of these in Paho if we add additional state tracking for messages that are in flight but any way of doing so would be a bad fit for some application.

The original PR didn't consider the requirement for ack'ing in order. That's why I suggested some changes because we don't want any application to get this wrong by accident. Nonetheless, I don't think that Paho has any obligation to do extra work (in cpu cycle as well as developer time) to stop someone who wants to do this on purpose. I wouldn't advocate for anyone to break the standard but if someone does it in their test setup at home, I won't loose any sleep over it.

So, let me rephrase your (implicit) question so I can give a more useful answer: What should an application do to comply with the requirement?

The simplest case is this: Call ack() before returning from on_message(). This will handle messages in order so the acks will automatically be in order. That's what almost all applications will do, I expect.

Let me outline two cases that are more complex:

  1. Processing some messages takes a long time so you want to process other messages in parallel. You don't want to delay all acks so you write the message to persistent storage so you can safely ack it early. The parallel part only starts after the ack so acks will be in order. In other words, everything related to Paho will be in order and is the same as in the simple case. (In practice, you will probably write in batches and ack after flush but this will still be in order.)

  2. Messages are handled by different parts of the system and those work in parallel. You could wait for each message to be handled before you distribute the next one but you don't want to do that. Therefore, acks may be ready in an arbitrary order. You will have to keep track of in-flight messages and pass acks to Paho in the right order. You may want to ensure that the number of in-flight messages isn't unbounded and you probably want some kind of timeout (and maybe re-send/re-issue logic) so a single missing ack doesn't stall progress forever.

@petersilva
Copy link
Contributor Author

A third case, similar to @BBBSnowball ones that is widely used in my app:

  • I am using only MQTTv5 because my use cases require shared subscriptions.
  • n-subscriber processes consuming from a shared subscription. Each of the n-processes has no idea who else is participating in the share, and has no way of knowing which other share participants have consumed what from the queue. an individual subscriber or even the collective group of subscribers to a shared queue, even in principle, cannot ack in order.

petersilva added a commit to petersilva/paho.mqtt.python that referenced this pull request Oct 2, 2023
petersilva added a commit to petersilva/paho.mqtt.python that referenced this pull request Oct 3, 2023
@petersilva
Copy link
Contributor Author

closing in favour of: #753

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants