-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pubsub: consider providing a way to "forget" messages #691
Comments
The user could publish on another topic, and ack the original message. Then they have more control over timing, and more insight into how many messages are in this state. |
Since @jba described a better way to handle this, I'll close out this issue. |
Could I convince you to change your mind here? I also have this use case, and I would much prefer to just be able to forget a message and allow it to time out. Publishing on a different topic means that our workers would have to listen on multiple subscriptions for each workload. We use MaxOutstandingMessages to control concurrency. But this is configured per subscription, so is of no use when we're subscribed to multiple topics, one of which may never (or rarely) have messages. Essentially, we'd have to build our own concurrency regulator and make it independent of the subscriptions. The handover from one queue to another also introduces an error-prone situation where a disruption results in either a message that's on both queues, or a dropped message (depending on whether we ack-then-publish or publish-then-ack. All of this can be managed, but it would be much, much simpler to just have a .Forget method analogous to .Ack and .Nack, which just lets the message time out. |
@cortesi, can you explain your use case again? First you say it's the same, which has to do with wanting to delay processing because of transient failures. But then you mention what seems to be an unrelated issue, about how MaxOutstandingMessages is per-subscription. Can you explain exactly how Forget will help you? |
@jba Our use-case is uncomplicated: we need to re-process messages with a delay if there's a transient failure. We're migrating from SQS, where this is quite simple. With PubSub, we are handling this by setting a scheduled time on messages, and re-queueing repeatedly until they can be serviced. We have no way to add a delay, so if there's been a spike in transient errors our queue consumers spend a lot of time re-pushing postponed messages when they should be doing real work. This is wasteful and error prone. The MaxOutstandingMessages issue is in fact very closely related. We can "forget" a message by doing nothing, that is, we neither Ack nor Nack the message and just return from the worker goroutine. Once the message exceeds the ack deadline it will then be redelivered, and we can control the delay with MaxExtension. This is would be a passable bandaid to cover up for the lack of explicit delay in PubSub. However, we can't do this because of the way the Go API is structured. We are using MaxOutstandingMessages to control concurrency of the queue consumers. If we "forget" by doing nothing, we consume an outstanding message slot, which is not what we want. To be concrete, say we have a fixed workload where 100% of CPU is consumed by 8 reasonably long-running concurrent worker goroutines. Setting MaxOutstandingMessages to 8 ensures that exactly 8 goroutines run at a time. If we "forget" a message by doing nothing, we occupy one of these 8 worker slots until the message times out - in the meantime, only 7 workers are running and our system is under-utilized. We would like to be able to call .Forget() on the message instead, dropping it and moving on to the next message immediately. The fact that we use MaxOutstandingMessages to control concurrency is also why your suggestion of using multiple queues would be very painful. Imagine we had two subscriptions, A and B. B is the topic that we push to if there's a transient failure - it may or may not have messages on it. Now we have two subscribers, with their concurrency controlled separately by their own MaxOutstandingMessages setting. This means we can no longer use it control overall concurrency. We can, with quite some complication, handle these scenarios by managing our own concurrency and decoupling it from MaxOutstandingMessages. I feel this should be catered for in google-cloud-go, though, and a .Forget method on messages would be a step forward. |
Thanks, that was very clear and convincing. Unfortunately, after internal discussions with the pubsub team, we've decided not to support delays in pubsub. Are you familiar with AppEngine task queues? We think they might be a better fit for your problem, and we're working on making them easier to use outside of AppEngine. |
@jba We looked at task queues when we were considering AppEngine at the beginning of this project. PubSub was a better fit, for a number of reasons. We could re-assess if the Go libraries mature and use outside AppEngine becomes easier, but in the meantime we need a more immediate solution. We understand that explicit delay is not a goal for PubSub more broadly. However, where does this leave us with regards to the .Forget API addition? With this in place, our problem is effectively solved without changes to PubSub itself. |
@cortesi You can join the Cloud Tasks alpha at https://goo.gl/Ya0AZd. I realize that doesn't really answer your question. |
@cortesi Also, we'd love your feedback about why PubSub is better than Cloud Tasks for your use case. |
https://code-review.googlesource.com/c/gocloud/+/33250 is adding this feature. |
I really think we should not add this feature to the client library for a couple of reasons:
We should talk about this feature in the context of Pub/Sub as a whole and not in the context of a single client library. Thanks! |
@kamalaboulhosn Acording to #1147 the ruby lib already has at least one feature that is not available on the Go lib, so that ship may have already sailed. :) |
@edevil That looks to be true, yes, and admittedly there are still some inconsistencies among the different libraries that we are trying to limit. I'd rather not proliferate this kind of API difference further (and instead would prefer to deprecate the one in the Ruby library). ModifyAckDeadline, which is the underlying feature used to implement this, isn't really designed to be used to forget a message and delays its redelivery. It is really meant for situations where the client needs more time to process a message. We appreciate the fact that this is a useful feature to have and have had some discussions about how we would implement it, but want to make sure we are solving the use case correctly. It would be better to keep this kind of feature as implemented on a separate branch or fork until the underlying Pub/Sub service has native support for it. |
This is an auto-generated regeneration of the gapic clients by cloud.google.com/go/internal/gapicgen. Once the corresponding genproto PR is submitted, genbot will update this PR with a newer dependency to the newer version of genproto and assign reviewers to this PR. If you have been assigned to review this PR, please: - Ensure that the version of genproto in go.mod has been updated. - Ensure that CI is passing. If it's failing, it requires your manual attention. - Approve and submit this PR if you believe it's ready to ship. Corresponding genproto PR: googleapis/go-genproto#728 Changes: feat(dialogflow/cx): added support for custom content types docs: reformat comments PiperOrigin-RevId: 414026488 Source-Link: googleapis/googleapis@26ab5dd fix!(compute): handle GCP enum name start with uppercase IPProtocol (#691) Source-Link: googleapis/googleapis@ae8950f
This is an auto-generated regeneration of the gapic clients by cloud.google.com/go/internal/gapicgen. Once the corresponding genproto PR is submitted, genbot will update this PR with a newer dependency to the newer version of genproto and assign reviewers to this PR. If you have been assigned to review this PR, please: - Ensure that the version of genproto in go.mod has been updated. - Ensure that CI is passing. If it's failing, it requires your manual attention. - Approve and submit this PR if you believe it's ready to ship. Corresponding genproto PR: googleapis/go-genproto#728 Changes: feat(dialogflow/cx): added support for custom content types docs: reformat comments PiperOrigin-RevId: 414026488 Source-Link: googleapis/googleapis@26ab5dd fix!(compute): handle GCP enum name start with uppercase IPProtocol (#691) Source-Link: googleapis/googleapis@ae8950f
This is an auto-generated regeneration of the gapic clients by cloud.google.com/go/internal/gapicgen. Once the corresponding genproto PR is submitted, genbot will update this PR with a newer dependency to the newer version of genproto and assign reviewers to this PR. If you have been assigned to review this PR, please: - Ensure that the version of genproto in go.mod has been updated. - Ensure that CI is passing. If it's failing, it requires your manual attention. - Approve and submit this PR if you believe it's ready to ship. Corresponding genproto PR: googleapis/go-genproto#728 Changes: feat(dialogflow/cx): added support for custom content types docs: reformat comments PiperOrigin-RevId: 414026488 Source-Link: googleapis/googleapis@26ab5dd fix!(compute): handle GCP enum name start with uppercase IPProtocol (#691) Source-Link: googleapis/googleapis@ae8950f
See this issue for motivation.
If user code fails to process a message due to transient failures in other services, the users might not want to make the message immediately available for redelivery. The message would just be quickly redelivered just to fail again.
One way we can help is providing a way to forget messages. The client library can stop extending the messages' deadline and let the messages redeliver naturally without spending time and memory keeping track of their ack IDs.
@jba @kamalaboulhosn WDYT?
The text was updated successfully, but these errors were encountered: