Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split presence out of master #9820

Merged
merged 9 commits into from
Apr 23, 2021
23 changes: 22 additions & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class WriterLocations:
Attributes:
events: The instances that write to the event and backfill streams.
typing: The instance that writes to the typing stream.
to_device: The instance that writes to the to_device stream.
account_data: The instance that writes to the account data streams.
receipts: The instance that writes to the receipts stream.
presence: The instance that writes to the presence stream.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like each of these can be a list of instances?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, apparently it's a single-element list, but you should probably say that.

(why are these different to typing ?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(why are these different to typing ?)

Err, good Q. I think there are two options here: 1) make the single item lists just be strings, or 2) make typing be a single item list. I'm tempted to go with the latter so everything is consistent, but the former may be a bit less confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making typing a single-item list sounds fine to me.

"""

events = attr.ib(
Expand All @@ -85,6 +89,11 @@ class WriterLocations:
type=List[str],
converter=_instance_to_list_converter,
)
presence = attr.ib(
default=["master"],
type=List[str],
converter=_instance_to_list_converter,
)


class WorkerConfig(Config):
Expand Down Expand Up @@ -188,7 +197,14 @@ def read_config(self, config, **kwargs):

# Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing", "to_device", "account_data", "receipts"):
for stream in (
"events",
"typing",
"to_device",
"account_data",
"receipts",
"presence",
):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
Expand All @@ -215,6 +231,11 @@ def read_config(self, config, **kwargs):
if len(self.writers.events) == 0:
raise ConfigError("Must specify at least one instance to handle `events`.")

if len(self.writers.presence) != 1:
raise ConfigError(
"Must only specify one instance to handle `presence` messages."
)

self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events
)
Expand Down
56 changes: 36 additions & 20 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@


class BasePresenceHandler(abc.ABC):
"""Parts of the PresenceHandler that are shared between workers and master"""
"""Parts of the PresenceHandler that are shared between workers and presence
writer"""

def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
Expand Down Expand Up @@ -308,17 +309,25 @@ def __init__(self, hs):
super().__init__(hs)
self.hs = hs

self._presence_writer_instance = hs.config.worker.writers.presence[0]

self._presence_enabled = hs.config.use_presence

# Route presence EDUs to the right worker
hs.get_federation_registry().register_instances_for_edu(
"m.presence",
hs.config.worker.writers.presence,
)

# The number of ongoing syncs on this process, by user id.
# Empty if _presence_enabled is false.
self._user_to_num_current_syncs = {} # type: Dict[str, int]

self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()

# user_id -> last_sync_ms. Lists the users that have stopped syncing
# but we haven't notified the master of that yet
# user_id -> last_sync_ms. Lists the users that have stopped syncing but
# we haven't notified the presence writer of that yet
self.users_going_offline = {}

self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
Expand Down Expand Up @@ -351,31 +360,32 @@ def send_user_sync(self, user_id, is_syncing, last_sync_ms):
)

def mark_as_coming_online(self, user_id):
"""A user has started syncing. Send a UserSync to the master, unless they
had recently stopped syncing.
"""A user has started syncing. Send a UserSync to the presence writer,
unless they had recently stopped syncing.

Args:
user_id (str)
"""
going_offline = self.users_going_offline.pop(user_id, None)
if not going_offline:
# Safe to skip because we haven't yet told the master they were offline
# Safe to skip because we haven't yet told the presence writer they
# were offline
self.send_user_sync(user_id, True, self.clock.time_msec())

def mark_as_going_offline(self, user_id):
"""A user has stopped syncing. We wait before notifying the master as
its likely they'll come back soon. This allows us to avoid sending
a stopped syncing immediately followed by a started syncing notification
to the master
"""A user has stopped syncing. We wait before notifying the presence
writer as its likely they'll come back soon. This allows us to avoid
sending a stopped syncing immediately followed by a started syncing
notification to the presence writer

Args:
user_id (str)
"""
self.users_going_offline[user_id] = self.clock.time_msec()

def send_stop_syncing(self):
"""Check if there are any users who have stopped syncing a while ago
and haven't come back yet. If there are poke the master about them.
"""Check if there are any users who have stopped syncing a while ago and
haven't come back yet. If there are poke the presence writer about them.
"""
now = self.clock.time_msec()
for user_id, last_sync_ms in list(self.users_going_offline.items()):
Expand Down Expand Up @@ -491,9 +501,12 @@ async def set_state(self, target_user, state, ignore_status_msg=False):
if not self.hs.config.use_presence:
return

# Proxy request to master
# Proxy request to instance that writes presence
await self._set_state_client(
user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
instance_name=self._presence_writer_instance,
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
user_id=user_id,
state=state,
ignore_status_msg=ignore_status_msg,
)

async def bump_presence_active_time(self, user):
Expand All @@ -504,9 +517,11 @@ async def bump_presence_active_time(self, user):
if not self.hs.config.use_presence:
return

# Proxy request to master
# Proxy request to instance that writes presence
user_id = user.to_string()
await self._bump_active_client(user_id=user_id)
await self._bump_active_client(
instance_name=self._presence_writer_instance, user_id=user_id
)


class PresenceHandler(BasePresenceHandler):
Expand Down Expand Up @@ -1908,7 +1923,7 @@ def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler):
self._queue_presence_updates = True

# Whether this instance is a presence writer.
self._presence_writer = hs.config.worker.worker_app is None
self._presence_writer = self._instance_name in hs.config.worker.writers.presence

# The FederationSender instance, if this process sends federation traffic directly.
self._federation = None
Expand Down Expand Up @@ -1956,7 +1971,7 @@ def send_presence_to_destinations(
Will forward to the local federation sender (if there is one) and queue
to send over replication (if there are other federation sender instances.).

Must only be called on the master process.
Must only be called on the presence writer process.
"""

# This should only be called on a presence writer.
Expand Down Expand Up @@ -2002,10 +2017,11 @@ async def get_replication_rows(
We return rows in the form of `(destination, user_id)` to keep the size
of each row bounded (rather than returning the sets in a row).

On workers this will query the master process via HTTP replication.
On workers this will query the presence writer process via HTTP replication.
"""
if instance_name != self._instance_name:
# If not local we query over http replication from the master
# If not local we query over http replication from the presence
# writer
result = await self._repl_client(
instance_name=instance_name,
stream_name=PresenceFederationStream.NAME,
Expand Down
18 changes: 16 additions & 2 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
CachesStream,
EventsStream,
FederationStream,
PresenceFederationStream,
PresenceStream,
ReceiptsStream,
Stream,
TagAccountDataStream,
Expand Down Expand Up @@ -99,6 +101,10 @@ def __init__(self, hs: "HomeServer"):
self._instance_id = hs.get_instance_id()
self._instance_name = hs.get_instance_name()

self._is_presence_writer = (
hs.get_instance_name() in hs.config.worker.writers.presence
)

self._streams = {
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
} # type: Dict[str, Stream]
Expand Down Expand Up @@ -153,6 +159,14 @@ def __init__(self, hs: "HomeServer"):

continue

if isinstance(stream, (PresenceStream, PresenceFederationStream)):
# Only add PresenceStream as a source on the instance in charge
# of presence.
if self._is_presence_writer:
self._streams_to_replicate.append(stream)

continue

# Only add any other streams if we're on master.
if hs.config.worker_app is not None:
continue
Expand Down Expand Up @@ -350,7 +364,7 @@ def on_USER_SYNC(
) -> Optional[Awaitable[None]]:
user_sync_counter.inc()

if self._is_master:
if self._is_presence_writer:
return self._presence_handler.update_external_syncs_row(
cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)
Expand All @@ -360,7 +374,7 @@ def on_USER_SYNC(
def on_CLEAR_USER_SYNC(
self, conn: IReplicationConnection, cmd: ClearUserSyncsCommand
) -> Optional[Awaitable[None]]:
if self._is_master:
if self._is_presence_writer:
return self._presence_handler.update_external_syncs_clear(cmd.instance_id)
else:
return None
Expand Down
17 changes: 12 additions & 5 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,22 @@ class PresenceStream(Stream):
NAME = "presence"
ROW_TYPE = PresenceStreamRow

def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
store = hs.get_datastore()

if hs.config.worker_app is None:
# on the master, query the presence handler
if hs.get_instance_name() in hs.config.worker.writers.presence:
# on the presence writer, query the presence handler
presence_handler = hs.get_presence_handler()
update_function = presence_handler.get_all_presence_updates

from synapse.handlers.presence import PresenceHandler

assert isinstance(presence_handler, PresenceHandler)

update_function = (
presence_handler.get_all_presence_updates
) # type: UpdateFunction
else:
# Query master process
# Query presence writer process
update_function = make_http_update_function(hs, self.NAME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this need directing to the presence writer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update_function gets called with the instance to query (derived from the POSITION or RDATA commands that were just received, so will always be the presence writer)


super().__init__(
Expand Down
6 changes: 3 additions & 3 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,10 @@ def get_state_resolution_handler(self) -> StateResolutionHandler:

@cache_in_self
def get_presence_handler(self) -> BasePresenceHandler:
if self.config.worker_app:
return WorkerPresenceHandler(self)
else:
if self.get_instance_name() in self.config.worker.writers.presence:
return PresenceHandler(self)
else:
return WorkerPresenceHandler(self)

@cache_in_self
def get_typing_writer_handler(self) -> TypingWriterHandler:
Expand Down
3 changes: 2 additions & 1 deletion tests/rest/client/v1/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from twisted.internet import defer

from synapse.handlers.presence import PresenceHandler
from synapse.rest.client.v1 import presence
from synapse.types import UserID

Expand All @@ -32,7 +33,7 @@ class PresenceTestCase(unittest.HomeserverTestCase):

def make_homeserver(self, reactor, clock):

presence_handler = Mock()
presence_handler = Mock(spec=PresenceHandler)
presence_handler.set_state.return_value = defer.succeed(None)

hs = self.setup_test_homeserver(
Expand Down