-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor session ↔︎ leader interaction for durable shared subscriptions #14428
Refactor session ↔︎ leader interaction for durable shared subscriptions #14428
Conversation
* Rename "groups" to "ssubscribers" (shared subscribers). * Eliminate group_id concept, just use subscription id. * Use aliases for ssubscriber identification. * Recreate ssubscriber on invalidation. This reduces posibility of stale messages and simplifies ssubscriber lifecycle. * Do not opaquely handle messages to ssubscribers.
We cannot drain messages at this level because they require unpacking on higher levels.
22b6812
to
c2d17f5
Compare
797a849
to
122cf89
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but a review from someone more familiar with this would be best.
🍻
Co-authored-by: Thales Macedo Garitezi <[email protected]>
a7b4f17
to
8d64f5b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty solid refactoring. Definitely became easier to comprehend and navigate.
|
||
-type options() :: #{ | ||
session_id := emqx_persistent_session_ds:id() | ||
}. | ||
|
||
-record(ssubscriber_entry, { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit / random idea. Worth introducing something like "subtenant" here or some similar term, for it to be a bit shorter and easier to parse?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you refer to the "entry" data item or the ssubscriber terminology in general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latter.
@@ -92,7 +104,7 @@ can_subscribe(_State, #share{group = Group, topic = Topic}, _SubOpts) -> | |||
exists -> | |||
ok; | |||
{error, Class, Reason} -> | |||
?tp(warning, "Shared queue declare failed", #{ | |||
?tp(debug, ds_shared_sub_agent_queue_declare_failed, #{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: On debug
level would be easier to lose such error, do you think there's a risk of flooding the logs with warnings? It's something that's not supposed to usually happen, especially if unrecoverable
.
{MsFieldName, | ||
?HOCON( | ||
emqx_schema:timeout_duration_ms(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. Duration unit is already encoded in the type here, so it feels a bit awkward to have _ms
suffix everywhere. E.g. durable_queues { leader_ssubscriber_timeout_ms = "5s" }
.
-type stream_data() :: #{ | ||
status := stream_status(), | ||
progress := progress(), | ||
use_finished := boolean() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. I think a comment describing what it is and what it is for would be welcome here.
%% Should never happen. | ||
reset(St); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. Could be great to have better visibility into such events. Or perhaps something like a test-profile-only assertion?
I will add some updates in a follow-up. |
Fixes EMQX-13515
Release version: v/e5.9.0
Here, we make several internal model simplifications, resulting in less code, less complex code, and thus, hopefully, in more robust code.
These changes do not affect the code interacting with durable shared subscriptions, e.g., durable session.
Old code structure
emqx_persistent_session_ds_shared_subs
, symmetrically toemqx_persistent_session_ds_subs
.emqx_persistent_session_ds_shared_subs
talks toemqx_ds_shared_sub_agent
hiding session internals from it.emqx_ds_shared_sub_agent
;emqx_ds_shared_sub_agent
: stream leases and revokes;emqx_ds_shared_sub_agent
.emqx_ds_shared_sub_agent
belongs toemqx_ds_shared_sub
up and talks to shared sub leaders. Technically it does little itself but delegates calls to a collection ofemqx_ds_shared_sub_group_sm
's instances, one for each topic subscription.emqx_ds_shared_sub_group_sm
represents a stateful connection to a leader. It actually talks to the leader, receives streams from it, etc.emqx_ds_shared_sub_leader
is responsible for renewing and distributing streams across connectedemqx_ds_shared_sub_group_sm
s and recording progress.Changes
The general structure or layers remains the same.
emqx_persistent_session_ds_shared_subs
We radically simplify
emqx_persistent_session_ds_shared_subs
. (The first step was done in #14227).The relevant commit is f65eccd
Now
emqx_persistent_session_ds_shared_subs
does not have any additional state exceptemqx_ds_shared_sub_agent
to which it talks to receive streams.emqx_ds_shared_sub_agent
subscribers
(formergroup_sm
s). This is not necessary for dispatching messages to individual subscription handlers.group_sm
s) an ability to request a complete recreation (in case inconsistent state is detected).The relevant commit is 81271d3
emqx_ds_shared_sub_group_sm
emqx_ds_shared_sub_group_sm
toemqx_ds_shared_sub_subscriber
. We identify it throughout the code asssubscriber
s, shared subscribers.ssubscriber
finishes and requests the agent to be recreated. The states are [new] ->connecting
(to the leader) ->connected
->unsubscribing
-> [deleted].ssubscriber
's states from stream states, and the related events are handled mostly independently.The corresponding commit is 12acd63.
emqx_ds_shared_sub_leader
gen_server
since it always appeared to have a single state.emqx_ds_shared_sub_subscriber
updates, we change the model. Instead of having subscribers with versioned sets of revoking/granted streams, we have a plain structure of streams with status and owner.The corresponding commit is 3975178
I tried to group the described changes into distinct relevant commits. However, since the changes were quite significant, individual commits do not keep the code in a working state. There are some final changes to make everything up and running.
Summary
PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md
files (internal change of unreleased feature).