diff --git a/active/0028-assets/groupsm_leader_communication.png b/active/0028-assets/groupsm_leader_communication.png new file mode 100644 index 0000000..bc9b33b Binary files /dev/null and b/active/0028-assets/groupsm_leader_communication.png differ diff --git a/active/0028-assets/session-side-state.png b/active/0028-assets/session-side-state.png index a2812a7..3d2d00b 100644 Binary files a/active/0028-assets/session-side-state.png and b/active/0028-assets/session-side-state.png differ diff --git a/active/0028-assets/session-side-state.uml b/active/0028-assets/session-side-state.uml index c9a8621..fdcafea 100644 --- a/active/0028-assets/session-side-state.uml +++ b/active/0028-assets/session-side-state.uml @@ -14,6 +14,9 @@ skinparam DefaultTextAlignment left (connecting) --> (replaying) :\ lease_streams +(connecting) --> (disconnected) :\ +session induced disconnect + (replaying) --> (replaying): \ renew_stream_lease(vsn)\n\ @@ -28,6 +31,8 @@ renew_stream_lease(vsn)\n\ (replaying) --> (updating) :\ update_streams\n +(replaying) --> (disconnected) :\ +session induced disconnect (updating) --> (updating) :\ • update_streams\n\ @@ -42,4 +47,7 @@ renew_stream_lease(vsn_new) • invalidate\n\ • update_streams(invalid vsns) +(updating) --> (disconnected) :\ +session induced disconnect + @enduml diff --git a/active/0028-durable-shared-subscriptions.md b/active/0028-durable-shared-subscriptions.md index 49c7eee..04e6deb 100644 --- a/active/0028-durable-shared-subscriptions.md +++ b/active/0028-durable-shared-subscriptions.md @@ -3,6 +3,10 @@ ## Changelog * 2024-05-10: @savonarola Initial draft +* 2024-06-28: @savonarola + * Add the Agent abstraction + * Describe thr two-side communication sequence between an Agent and the SGL + * Describe the stream reassignment algorithm ## Abstract @@ -28,11 +32,11 @@ SGL keeps track of topics belonging to the group, their streams, and stream stat The groups' consumers are persistent sessions. They connect to the SGL, and the SGL leases them streams to consume. Sessions consume these streams together with their proper streams but do not persist the progress. Instead, they report the progress to the SGL. -SGL is responsible for reassigning streams to the other group consumers in case a consumer disconnects and for reassigning streams to the new consumers. +SGL is responsible for reassigning streams to the other group of consumers in case a consumer disconnects and for reassigning streams to the new consumers. ![General Design](./0028-assets/general-design.png) -All communication between the consumers(sessions) and the SGL, SGLM, is done asynchronously because leaders may need to be spawned (this requires election) or may be running on a remote node. +All communication between the consumers(sessions) and the SGL, SGLM is done asynchronously because leaders may need to be spawned (this requires election) or may be running on a remote node. ### Session Side @@ -43,12 +47,18 @@ The SSubHandler is passive, i.e., it does not contain any running processes. It A session is responsible for: * Initializing the SSubHandler data on session bootstrap. * Delivering Shared Sub-related messages (from timers, from other entities) to the SSubHandler. -* Forwarding subscribe/unsubscribe `$shares/group/...` requests to the SSubHandler. +* Forwarding subscribe/unsubscribe `$shared/group/...` requests to the SSubHandler. * Querying stream states (`stream_state()`) from SSubHandler for replay and reporting replay results to the SSubHandler. ### Shared Subscription Session Handler -SSubHandler data is a collection of Group Subscription FSMs (GSFSM) identified by the group ID. +SSubHandler *lies in the domain of the session*. It knows the session's state, stores shared suscriptions and the related data in the session's state and uses **Agent** abstraction to communicate with the SGL. Agent provides the interface lying *outside* the session's domain. + +### Agent + +Agent is the entity that communicates with different SGLs. It speaks in the terms of streams and iterators, not knowing about sessions, subscriptions, etc. + +Agent's data is a collection of Group Subscription FSMs (GSFSM) identified by the group ID. ```erlang #{ @@ -63,6 +73,7 @@ Each GSFSM contains the following states: * `connecting` - the initial state, the GSFSM is looking for a Group Subscription Leader (SGL). * `replaying` - the GSFSM is connected to the SGL and provides stream states for replay. * `updating` - the GSFSM is connected to the SGL and is updating the set of streams. +* `disconnected` - the GSFSM is disconnected from the SGL. ### Protocol between Session and SGL @@ -128,10 +139,60 @@ In the `updating` state, the GSFSM accepts replay requests from the session side * `{update_stream_states, VersionNew, StreamsNew, VersionOld, StreamsOld, ...}` - to the SGL to update the stream states. Both for active streams and for taken-over streams. If taken-over streams are fully acked, the according flag is sent for them. -### State transitions +#### `disconnected` state + +GSFSM can pass to the `disconnected` state from any other state. It happens when the session disconnects. When entering the `disconnected` state, the GSFSM sends a `disconnect` message to the SGL with the latest stream states. + +#### State transitions ![State transitions](./0028-assets/session-side-state.png) +### Leader(SGL) side + +Agent may have many GroupSMs (one for each subscription), each GroupSM is connected to its own Leader. So a Leader may have many Agents connected (through GroupSMs) to it. + +The Leader tracks the state of each agent connected to it and has its own state machine for each agent. The Leader's view of the GroupSM's state can be one of the following: + +* waiting_replaing +* replaying +* waiting_updating +* updating + +The target state of GroupSM and its representation in Leader is `replaying`. That is, when the GroupSM and the Leader agree on the leased streams, the Leader sends lease confirmations to the GroupSM, and the GroupSM sends iteration updates. + +Other states are used to gracefully reassign streams to the GroupSM. + +## Communication sequence + +Below is the sequence diagram of the interaction. The full cycle is shown, from replaying-replaying to replaying-replaying states. + +![GroupSM and Leader communication](./0028-assets/groupsm_leader_communication.png) + +## Stream reassignment (rebalance) + +We want the streams of a Leader be evenly distributed among the agents. The periodical rebalancing algorithm is the following. + +* We discover new streams from DS, and mark _unassigned_ if there are any. +* We do any changes only to _stable_ agents (those which are not in the `replaying` state). +* We check that replaying agents have the _desired_ number of streams. The desired number is calculated as the total number of streams divided by the number of agents (+1, if not divisible evenly): +```erlang +DesiredStreamCount = case + TotalStreamCount rem AgentCount of + 0 -> TotalStreamCount div AgentCount; + _ -> (TotalStreamCount div AgentCount) + 1 +end. +``` +* If the agent has more streams than desired, we _select streams for revocation_ and mark them as _revoked_. The streams still belong to the agent, but the agent goes to the 'updating' cycle, finishes stream replay, and returns them to the Leader. As soon as a stream is returned, it becomes _unassigned_. +* If the agent has fewer streams than desired, we _select streams for assignment_ and mark them as assigned to the agent. The agent goes to the `updating` cycle in which it confirms the reception of the streams and starts replaying them. + +So, there is no "direct transfer" of streams between agents. When e.g., a new agent connects, a typical scenario is: + +* The agent connects to the Leader. +* It may be assigned 0 streams just now as there are no free ones. +* On the next rebalance, we see that some agents became overpopulated (since the number of agents increased). So, we select streams for revocation. There are no free streams yet. We also still see that the recently connected agent is underpopulated. +* Gradually, the streams are returned to the Leader and become unassigned. This happens outside the iterations of the rebalance. +* On one of the next rebalances, we see again that the recently connected agent is underpopulated, but now there are some free streams. We assign them to the agent. + ### Configuration Changes ### Backwards Compatibility