Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor session ↔︎ leader interaction for durable shared subscriptions #14428

Merged
merged 22 commits into from
Dec 20, 2024

Conversation

savonarola
Copy link
Contributor

@savonarola savonarola commented Dec 17, 2024

Fixes EMQX-13515

Release version: v/e5.9.0

Here, we make several internal model simplifications, resulting in less code, less complex code, and thus, hopefully, in more robust code.

These changes do not affect the code interacting with durable shared subscriptions, e.g., durable session.

Old code structure

  • DS session delegates shared sub handling to emqx_persistent_session_ds_shared_subs, symmetrically to emqx_persistent_session_ds_subs.
  • emqx_persistent_session_ds_shared_subs talks to emqx_ds_shared_sub_agent hiding session internals from it.
    • Also, delegates subscription/unsubscription requests to emqx_ds_shared_sub_agent;
    • receives events from emqx_ds_shared_sub_agent: stream leases and revokes;
    • sends stream progress to emqx_ds_shared_sub_agent.
  • emqx_ds_shared_sub_agent belongs to emqx_ds_shared_sub up and talks to shared sub leaders. Technically it does little itself but delegates calls to a collection of emqx_ds_shared_sub_group_sm's instances, one for each topic subscription.
  • emqx_ds_shared_sub_group_sm represents a stateful connection to a leader. It actually talks to the leader, receives streams from it, etc.
  • emqx_ds_shared_sub_leader is responsible for renewing and distributing streams across connected emqx_ds_shared_sub_group_sms and recording progress.

Changes

The general structure or layers remains the same.

emqx_persistent_session_ds_shared_subs

We radically simplify emqx_persistent_session_ds_shared_subs. (The first step was done in #14227).

  • As with regular subs, we identify subscription not by topic but by a unique ID. So When a session resubscribes to a topic, this is a completely new subscription.
  • We remove stream "finalization" after unsubscription from it.

The relevant commit is f65eccd

Now emqx_persistent_session_ds_shared_subs does not have any additional state except emqx_ds_shared_sub_agent to which it talks to receive streams.

emqx_ds_shared_sub_agent

  • Here we remove unnecessary preliminary unpacking of messages sent to subscribers (former group_sms). This is not necessary for dispatching messages to individual subscription handlers.
  • We address individual subscription handlers by an additional ref to guarantee the abscence of any stale messages.
  • We also provide to individual subscription handlers (former group_sms) an ability to request a complete recreation (in case inconsistent state is detected).

The relevant commit is 81271d3

emqx_ds_shared_sub_group_sm

  • We rename "Group State Machine" handler module emqx_ds_shared_sub_group_sm to emqx_ds_shared_sub_subscriber. We identify it throughout the code as ssubscribers, shared subscribers.
  • It does not have cyclic state changes and a complex state machine anymore. On invalidation, ssubscriber finishes and requests the agent to be recreated. The states are [new] -> connecting (to the leader) -> connected -> unsubscribing -> [deleted].
  • We discard "versioned sets of streams" (Andrew's idea). Instead, we grant and revoke individual streams. This decouples the ssubscriber's states from stream states, and the related events are handled mostly independently.

The corresponding commit is 12acd63.

emqx_ds_shared_sub_leader

  • We mostly keep the same the logic related to stream renewal and persistence.
  • We go away from the outdated "agent" naming. Previously, a session could be connected to a leader only once through its agent. Now, a session may have several ssubscribers connected (1 active and several unsubscribing), so this naming became more misleading.
  • We convert the module to a gen_server since it always appeared to have a single state.
  • Reflecting the new emqx_ds_shared_sub_subscriber updates, we change the model. Instead of having subscribers with versioned sets of revoking/granted streams, we have a plain structure of streams with status and owner.
  • We try to keep subscriber connection/validity handling distinct from stream granting/revocation. This results in smaller and less nested handlers.

The corresponding commit is 3975178

I tried to group the described changes into distinct relevant commits. However, since the changes were quite significant, individual commits do not keep the code in a working state. There are some final changes to make everything up and running.

Summary

PR Checklist

Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:

  • Added tests for the changes (tests present, we changed the internal behaviour)
  • [na] Added property-based tests for code which performs user input validation
  • [na] Change log has been added to changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md files (internal change of unreleased feature).
  • For internal contributor: there is a jira ticket to track this change
  • [na] Schema changes are backward compatible (not compatible but only for hidden parameters of an unreleased feature).

@savonarola savonarola force-pushed the 1206-refactor-shared-subs branch from 22b6812 to c2d17f5 Compare December 18, 2024 11:00
@savonarola savonarola changed the title 1206 refactor shared subs Refactor durable shared subscriptions Dec 18, 2024
@savonarola savonarola changed the title Refactor durable shared subscriptions Refactor session <-> leader interaction for durable shared subscriptions Dec 18, 2024
@savonarola savonarola changed the title Refactor session <-> leader interaction for durable shared subscriptions Refactor session ↔︎ leader interaction for durable shared subscriptions Dec 18, 2024
@savonarola savonarola marked this pull request as ready for review December 18, 2024 15:22
@savonarola savonarola requested review from lafirest and a team as code owners December 18, 2024 15:22
@savonarola savonarola force-pushed the 1206-refactor-shared-subs branch from 797a849 to 122cf89 Compare December 19, 2024 11:07
thalesmg
thalesmg previously approved these changes Dec 19, 2024
Copy link
Contributor

@thalesmg thalesmg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but a review from someone more familiar with this would be best.

🍻

rel/i18n/emqx_ds_shared_sub_schema.hocon Outdated Show resolved Hide resolved
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl Outdated Show resolved Hide resolved
Co-authored-by: Thales Macedo Garitezi <[email protected]>
@savonarola savonarola force-pushed the 1206-refactor-shared-subs branch from a7b4f17 to 8d64f5b Compare December 20, 2024 09:03
@savonarola savonarola merged commit f679e07 into emqx:master Dec 20, 2024
200 checks passed
@savonarola savonarola deleted the 1206-refactor-shared-subs branch December 20, 2024 16:42
Copy link
Contributor

@keynslug keynslug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty solid refactoring. Definitely became easier to comprehend and navigate.


-type options() :: #{
session_id := emqx_persistent_session_ds:id()
}.

-record(ssubscriber_entry, {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit / random idea. Worth introducing something like "subtenant" here or some similar term, for it to be a bit shorter and easier to parse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you refer to the "entry" data item or the ssubscriber terminology in general?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter.

@@ -92,7 +104,7 @@ can_subscribe(_State, #share{group = Group, topic = Topic}, _SubOpts) ->
exists ->
ok;
{error, Class, Reason} ->
?tp(warning, "Shared queue declare failed", #{
?tp(debug, ds_shared_sub_agent_queue_declare_failed, #{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: On debug level would be easier to lose such error, do you think there's a risk of flooding the logs with warnings? It's something that's not supposed to usually happen, especially if unrecoverable.

Comment on lines 66 to 68
{MsFieldName,
?HOCON(
emqx_schema:timeout_duration_ms(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Duration unit is already encoded in the type here, so it feels a bit awkward to have _ms suffix everywhere. E.g. durable_queues { leader_ssubscriber_timeout_ms = "5s" }.

-type stream_data() :: #{
status := stream_status(),
progress := progress(),
use_finished := boolean()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. I think a comment describing what it is and what it is for would be welcome here.

Comment on lines +231 to +232
%% Should never happen.
reset(St);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Could be great to have better visibility into such events. Or perhaps something like a test-profile-only assertion?

@savonarola
Copy link
Contributor Author

I will add some updates in a follow-up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants