Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Implementing delayed message cancellation in pulsar #23907

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Denovo1998
Copy link
Contributor

@Denovo1998 Denovo1998 commented Jan 29, 2025

Fixes #xyz

Main Issue: #23149

PIP: #xyz

Motivation

In dynamic event-driven architectures, the relevance and timing of messages are crucial. Apache Pulsar currently supports delayed message delivery, which is beneficial for scheduling future processing. However, once scheduled, there is no native support to modify or cancel these messages based on changing circumstances. This limitation poses challenges in situations where messages become obsolete before their scheduled delivery or when system conditions change, necessitating the early processing or cancellation of delayed messages.

If we want to implement the cancellation of delayed messages at the moment, we can only store the delayed messages to be cancelled in the memory of the consumer side, and if the delay time is too long, there will be many problems.

The idea of this PR is to send a message as a MARK message for canceling the target message. The delay time needs to be one to two windows before the target message, with the window size being delayedDeliveryTickTimeMillis.

After the MARK message arrives, it is stored in a map. When the target message is triggered, it is directly filtered, and finally, both messages will be directly acknowledged.

Modifications

The following changes were introduced in Pulsar’s message dispatching components to enable delayed message cancellation:

  • Extended AbstractBaseDispatcher with the ability to recognize and handle cancellation properties in message metadata.
  • Added constants for identifying cancellation-specific properties in message metadata.

The key implementation detail involves checking each message for a cancellation marker as it becomes eligible for dispatch. If a message is marked for cancellation, it is dropped from the dispatch queue and its associated resources are cleaned up immediately.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

Copy link

@Denovo1998 Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Jan 29, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @Denovo1998. Since there's a major impact to the core parts of Pulsar, more effort would have to be spent in design decisions.

All major project decisions (excluding new security issues) are made by the whole community in the Apache way on the [email protected] mailing list.

The most important thing about engaging with any Apache project is that everyone is equal. All project participants with an opinion can express that opinion and, where appropriate, have the community consider it.
Unless you already have, please join the dev mailing list using these instructions , and start a discussion about this proposal.

Pulsar project decisions are usually documented in the form of Pulsar Improvement Proposals (PIPs) and a new PIP is started with a discussion on the mailing list.

In this case, I believe that the use case is very clear, but the solution might be different from what the current pull request includes.

It's a useful starting point to have this PR in place before starting a PIP, however it won't be merged before we have concluded on a solution that meets our functional and non-functional requirements. For example, we are looking for maintainability (fits the current architecture), performance (doesn't have a negative performance impact on other workloads) and compatibility (doesn't break existing brokers and clients).

At a quick glance it seems that from the architecture side, that the current DelayedDeliveryTracker should be leveraged to support this use case. For encoding the marker messages, Pulsar contains marker messages. It should be considered how these cancellation events would be stored in the topic. Compatibility concerns are around possible limitations with geo-replication. It might not even make sense to store this information in the topic in the first place. When the cancellation messages are stored in the topic, cancellation would only work when the DelayedDeliveryTracker state is such that it has "indexed" the delayed messages and the cancellation messages. For example, the InMemoryDelayedDeliveryTracker keeps state only in memory. The impact of the cancellation commands in the topic would be that before delivering any scheduled message, the state would have to be caught up before delivering a scheduled message. This is just a first thought about the possible impact of supporting cancellation. Due to such performance impacting details, it's more likely that this type of cancellation support would have to be enabled for a namespace or topic explicitly.

I hope this comment helps in making further progress.

@lhotari
Copy link
Member

lhotari commented Jan 30, 2025

When the cancellation messages are stored in the topic, cancellation would only work when the DelayedDeliveryTracker state is such that it has "indexed" the delayed messages and the cancellation messages. For example, the InMemoryDelayedDeliveryTracker keeps state only in memory. The impact of the cancellation commands in the topic would be that before delivering any scheduled message, the state would have to be caught up before delivering a scheduled message. This is just a first thought about the possible impact of supporting cancellation. Due to such performance impacting details, it's more likely that this type of cancellation support would have to be enabled for a namespace or topic explicitly.

Not directly related, but contains some details about the current delayed delivery tracker solution: #23912 .
For the BucketDelayedDeliveryTracker, it could be feasible to add the cancellation support since it keeps persistent state. However, the challenge that the cancellation commands might not be processed while making delayed message delivery decisions remains. It seems that some sort of RPC would be needed and storing the cancellation commands directly in the BucketDelayedDeliveryTracker persistence solution could be feasible. For the InMemoryDelayedDeliveryTracker, I don't think that it would be feasible to add support for cancellation at all since based on my initial analysis, I don't think that it makes sense to store the cancellation state in the topic itself due to the problems with unprocessed cancellation commands while making delivery decisions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants