Skip to content

Commit

Permalink
docs(0028): extend shared subscription design description
Browse files Browse the repository at this point in the history
  • Loading branch information
savonarola committed Jun 28, 2024
1 parent bf8020d commit 02ae136
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 5 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified active/0028-assets/session-side-state.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 8 additions & 0 deletions active/0028-assets/session-side-state.uml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ skinparam DefaultTextAlignment left
(connecting) --> (replaying) :\
lease_streams

(connecting) --> (disconnected) :\
session induced disconnect

(replaying) --> (replaying): \
renew_stream_lease(vsn)\n\

Expand All @@ -28,6 +31,8 @@ renew_stream_lease(vsn)\n\
(replaying) --> (updating) :\
update_streams\n

(replaying) --> (disconnected) :\
session induced disconnect

(updating) --> (updating) :\
• update_streams\n\
Expand All @@ -42,4 +47,7 @@ renew_stream_lease(vsn_new)
• invalidate\n\
• update_streams(invalid vsns)

(updating) --> (disconnected) :\
session induced disconnect

@enduml
71 changes: 66 additions & 5 deletions active/0028-durable-shared-subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
## Changelog

* 2024-05-10: @savonarola Initial draft
* 2024-06-28: @savonarola
* Add the Agent abstraction
* Describe thr two-side communication sequence between an Agent and the SGL
* Describe the stream reassignment algorithm

## Abstract

Expand All @@ -28,11 +32,11 @@ SGL keeps track of topics belonging to the group, their streams, and stream stat

The groups' consumers are persistent sessions. They connect to the SGL, and the SGL leases them streams to consume. Sessions consume these streams together with their proper streams but do not persist the progress. Instead, they report the progress to the SGL.

SGL is responsible for reassigning streams to the other group consumers in case a consumer disconnects and for reassigning streams to the new consumers.
SGL is responsible for reassigning streams to the other group of consumers in case a consumer disconnects and for reassigning streams to the new consumers.

![General Design](./0028-assets/general-design.png)

All communication between the consumers(sessions) and the SGL, SGLM, is done asynchronously because leaders may need to be spawned (this requires election) or may be running on a remote node.
All communication between the consumers(sessions) and the SGL, SGLM is done asynchronously because leaders may need to be spawned (this requires election) or may be running on a remote node.

### Session Side

Expand All @@ -43,12 +47,18 @@ The SSubHandler is passive, i.e., it does not contain any running processes. It
A session is responsible for:
* Initializing the SSubHandler data on session bootstrap.
* Delivering Shared Sub-related messages (from timers, from other entities) to the SSubHandler.
* Forwarding subscribe/unsubscribe `$shares/group/...` requests to the SSubHandler.
* Forwarding subscribe/unsubscribe `$shared/group/...` requests to the SSubHandler.
* Querying stream states (`stream_state()`) from SSubHandler for replay and reporting replay results to the SSubHandler.

### Shared Subscription Session Handler

SSubHandler data is a collection of Group Subscription FSMs (GSFSM) identified by the group ID.
SSubHandler *lies in the domain of the session*. It knows the session's state, stores shared suscriptions and the related data in the session's state and uses **Agent** abstraction to communicate with the SGL. Agent provides the interface lying *outside* the session's domain.

### Agent

Agent is the entity that communicates with different SGLs. It speaks in the terms of streams and iterators, not knowing about sessions, subscriptions, etc.

Agent's data is a collection of Group Subscription FSMs (GSFSM) identified by the group ID.

```erlang
#{
Expand All @@ -63,6 +73,7 @@ Each GSFSM contains the following states:
* `connecting` - the initial state, the GSFSM is looking for a Group Subscription Leader (SGL).
* `replaying` - the GSFSM is connected to the SGL and provides stream states for replay.
* `updating` - the GSFSM is connected to the SGL and is updating the set of streams.
* `disconnected` - the GSFSM is disconnected from the SGL.

### Protocol between Session and SGL

Expand Down Expand Up @@ -128,10 +139,60 @@ In the `updating` state, the GSFSM accepts replay requests from the session side

* `{update_stream_states, VersionNew, StreamsNew, VersionOld, StreamsOld, ...}` - to the SGL to update the stream states. Both for active streams and for taken-over streams. If taken-over streams are fully acked, the according flag is sent for them.

### State transitions
#### `disconnected` state

GSFSM can pass to the `disconnected` state from any other state. It happens when the session disconnects. When entering the `disconnected` state, the GSFSM sends a `disconnect` message to the SGL with the latest stream states.

#### State transitions

![State transitions](./0028-assets/session-side-state.png)

### Leader(SGL) side

Agent may have many GroupSMs (one for each subscription), each GroupSM is connected to its own Leader. So a Leader may have many Agents connected (through GroupSMs) to it.

The Leader tracks the state of each agent connected to it and has its own state machine for each agent. The Leader's view of the GroupSM's state can be one of the following:

* waiting_replaing
* replaying
* waiting_updating
* updating

The target state of GroupSM and its representation in Leader is `replaying`. That is, when the GroupSM and the Leader agree on the leased streams, the Leader sends lease confirmations to the GroupSM, and the GroupSM sends iteration updates.

Other states are used to gracefully reassign streams to the GroupSM.

## Communication sequence

Below is the sequence diagram of the interaction. The full cycle is shown, from replaying-replaying to replaying-replaying states.

![GroupSM and Leader communication](./0028-assets/groupsm_leader_communication.png)

## Stream reassignment (rebalance)

We want the streams of a Leader be evenly distributed among the agents. The periodical rebalancing algorithm is the following.

* We discover new streams from DS, and mark _unassigned_ if there are any.
* We do any changes only to _stable_ agents (those which are not in the `replaying` state).
* We check that replaying agents have the _desired_ number of streams. The desired number is calculated as the total number of streams divided by the number of agents (+1, if not divisible evenly):
```erlang
DesiredStreamCount = case
TotalStreamCount rem AgentCount of
0 -> TotalStreamCount div AgentCount;
_ -> (TotalStreamCount div AgentCount) + 1
end.
```
* If the agent has more streams than desired, we _select streams for revocation_ and mark them as _revoked_. The streams still belong to the agent, but the agent goes to the 'updating' cycle, finishes stream replay, and returns them to the Leader. As soon as a stream is returned, it becomes _unassigned_.
* If the agent has fewer streams than desired, we _select streams for assignment_ and mark them as assigned to the agent. The agent goes to the `updating` cycle in which it confirms the reception of the streams and starts replaying them.

So, there is no "direct transfer" of streams between agents. When e.g., a new agent connects, a typical scenario is:

* The agent connects to the Leader.
* It may be assigned 0 streams just now as there are no free ones.
* On the next rebalance, we see that some agents became overpopulated (since the number of agents increased). So, we select streams for revocation. There are no free streams yet. We also still see that the recently connected agent is underpopulated.
* Gradually, the streams are returned to the Leader and become unassigned. This happens outside the iterations of the rebalance.
* On one of the next rebalances, we see again that the recently connected agent is underpopulated, but now there are some free streams. We assign them to the agent.

### Configuration Changes

### Backwards Compatibility
Expand Down

0 comments on commit 02ae136

Please sign in to comment.