Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add experimental support for sharding event persister. #8170

Merged
merged 6 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8170.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for sharding event persister.
23 changes: 20 additions & 3 deletions synapse/config/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,11 +832,28 @@ class ShardedWorkerHandlingConfig:
def should_handle(self, instance_name: str, key: str) -> bool:
"""Whether this instance is responsible for handling the given key.
"""

# If multiple instances are not defined we always return true.
# If multiple instances are not defined we always return true
if not self.instances or len(self.instances) == 1:
return True

return self.get_instance(key) == instance_name

def get_instance(self, key: str) -> str:
"""Get the instance responsible for handling the given key.

Note: For things like federation sending the config for which instance
is sending is known only to the sender instance if there is only one.
Therefore `should_handle` should be used where possible.
"""

# Note: For things like federation sending the config for which instance
# is sending is known only to the sender instance if there is only one.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
if not self.instances:
return "master"

if len(self.instances) == 1:
return self.instances[0]

# We shard by taking the hash, modulo it by the number of instances and
# then checking whether this instance matches the instance at that
# index.
Expand All @@ -846,7 +863,7 @@ def should_handle(self, instance_name: str, key: str) -> bool:
dest_hash = sha256(key.encode("utf8")).digest()
dest_int = int.from_bytes(dest_hash, byteorder="little")
remainder = dest_int % (len(self.instances))
return self.instances[remainder] == instance_name
return self.instances[remainder]


__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
1 change: 1 addition & 0 deletions synapse/config/_base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,4 @@ class ShardedWorkerHandlingConfig:
instances: List[str]
def __init__(self, instances: List[str]) -> None: ...
def should_handle(self, instance_name: str, key: str) -> bool: ...
def get_instance(self, key: str) -> str: ...
33 changes: 25 additions & 8 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Union

import attr

from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
from .server import ListenerConfig, parse_listener_def


def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
"""Helper for allowing parsing a string or list of strings to a config
option expecting a list of strings.
"""

if isinstance(obj, str):
return [obj]
return obj


@attr.s
class InstanceLocationConfig:
"""The host and port to talk to an instance via HTTP replication.
Expand All @@ -33,11 +45,13 @@ class WriterLocations:
"""Specifies the instances that write various streams.

Attributes:
events: The instance that writes to the event and backfill streams.
events: The instances that write to the event and backfill streams.
events: The instance that writes to the typing stream.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""

events = attr.ib(default="master", type=str)
events = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter
)
typing = attr.ib(default="master", type=str)


Expand Down Expand Up @@ -108,12 +122,15 @@ def read_config(self, config, **kwargs):
# Check that the configured writer for events and typing also appears in
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# `instance_map`.
for stream in ("events", "typing"):
instance = getattr(self.writers, stream)
if instance != "master" and instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured to write %s but does not appear in `instance_map` config."
% (instance, stream)
)
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured to write %s but does not appear in `instance_map` config."
% (instance, stream)
)

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)

def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
Expand Down
38 changes: 27 additions & 11 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,8 @@ async def backfill(self, dest, room_id, limit, extremities):
)
)

await self._handle_new_events(dest, ev_infos, backfilled=True)
if ev_infos:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this if-statement just an optimization?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh err, somewhere along the lines I think something got very unhappy about empty lists. I forget the details now or if its even necessary 😕

await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)

# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
Expand Down Expand Up @@ -1216,7 +1217,7 @@ async def get_event(event_id: str):
event_infos.append(_NewEventInfo(event, None, auth))

await self._handle_new_events(
destination, event_infos,
destination, room_id, event_infos,
)

def _sanity_check_event(self, ev):
Expand Down Expand Up @@ -1363,15 +1364,17 @@ async def do_invite_join(
)

max_stream_id = await self._persist_auth_tree(
origin, auth_chain, state, event, room_version_obj
origin, room_id, auth_chain, state, event, room_version_obj
)

# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
#
# TODO: Currently the events stream is written to from master
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
await self._replication.wait_for_stream_position(
self.config.worker.writers.events, "events", max_stream_id
self.config.worker.events_shard_config.get_instance(room_id),
"events",
max_stream_id,
)

# Check whether this room is the result of an upgrade of a room we already know
Expand Down Expand Up @@ -1625,7 +1628,7 @@ async def on_invite_request(
)

context = await self.state_handler.compute_event_context(event)
await self.persist_events_and_notify([(event, context)])
await self.persist_events_and_notify(event.room_id, [(event, context)])

return event

Expand All @@ -1652,7 +1655,9 @@ async def do_remotely_reject_invite(
await self.federation_client.send_leave(host_list, event)

context = await self.state_handler.compute_event_context(event)
stream_id = await self.persist_events_and_notify([(event, context)])
stream_id = await self.persist_events_and_notify(
event.room_id, [(event, context)]
)

return event, stream_id

Expand Down Expand Up @@ -1900,7 +1905,7 @@ async def _handle_new_event(
)

await self.persist_events_and_notify(
[(event, context)], backfilled=backfilled
event.room_id, [(event, context)], backfilled=backfilled
)
except Exception:
run_in_background(
Expand All @@ -1913,6 +1918,7 @@ async def _handle_new_event(
async def _handle_new_events(
self,
origin: str,
room_id: str,
event_infos: Iterable[_NewEventInfo],
backfilled: bool = False,
) -> None:
Expand Down Expand Up @@ -1944,6 +1950,7 @@ async def prep(ev_info: _NewEventInfo):
)

await self.persist_events_and_notify(
room_id,
[
(ev_info.event, context)
for ev_info, context in zip(event_infos, contexts)
Expand All @@ -1954,6 +1961,7 @@ async def prep(ev_info: _NewEventInfo):
async def _persist_auth_tree(
self,
origin: str,
room_id: str,
auth_events: List[EventBase],
state: List[EventBase],
event: EventBase,
Expand All @@ -1968,6 +1976,7 @@ async def _persist_auth_tree(

Args:
origin: Where the events came from
room_id,
auth_events
state
event
Expand Down Expand Up @@ -2042,17 +2051,20 @@ async def _persist_auth_tree(
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR

await self.persist_events_and_notify(
room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
]
],
)

new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
)

return await self.persist_events_and_notify([(event, new_event_context)])
return await self.persist_events_and_notify(
room_id, [(event, new_event_context)]
)

async def _prep_event(
self,
Expand Down Expand Up @@ -2903,21 +2915,25 @@ async def _check_key_revocation(self, public_key, url):

async def persist_events_and_notify(
self,
room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.

Args:
room_id:
event_and_contexts:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
backfilled: Whether these events are a result of
backfilling or not
"""
if self.config.worker.writers.events != self._instance_name:
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
Comment on lines +2931 to +2932
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: I wrote the below before realizing that instance is also used in the _send_events call below, I thought it was worth leaving though in case it jiggles a good idea loose:

It looks like this pattern is used quite a bit, in the comments for ShardedWorkerHandlingConfig it says to prefer should_handle, which seems like it could be used here:

if self.config.worker.events_shard_config.should_handle(self._instance_name, room_id):

Although I think some of this could be simplified more if ShardedWorkerHandlingConfig knew what the current instance was (maybe should_handle wouldn't need the instance passed in?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nods. The problem with giving ShardedWorkerHandlingConfig the current instance name is that I don't think we know what the instance name is during config parsing (outside of the worker config parsing).

Copy link
Member Author

@erikjohnston erikjohnston Sep 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think we do need to use should_handle first to technically conform to the docs of ShardedWorkerHandlingConfig

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I guess the fact that we go on to do a HTTP replication hit means that get_instance has to work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is OK to use get_instance here, that and should_handle should have matching logic after all!

result = await self._send_events(
instance_name=self.config.worker.writers.events,
instance_name=instance,
store=self.store,
room_id=room_id,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
Expand Down
14 changes: 8 additions & 6 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,8 @@ def __init__(self, hs: "HomeServer"):
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
self._is_event_writer = (
self.config.worker.writers.events == hs.get_instance_name()
)
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()

self.room_invite_state_types = self.hs.config.room_invite_state_types

Expand Down Expand Up @@ -911,9 +910,10 @@ async def handle_new_client_event(

try:
# If we're a worker we need to hit out to the master.
if not self._is_event_writer:
writer_instance = self._events_shard_config.get_instance(event.room_id)
if writer_instance != self._instance_name:
result = await self.send_event(
instance_name=self.config.worker.writers.events,
instance_name=writer_instance,
event_id=event.event_id,
store=self.store,
requester=requester,
Expand Down Expand Up @@ -981,7 +981,9 @@ async def persist_and_notify_client_event(

This should only be run on the instance in charge of persisting events.
"""
assert self._is_event_writer
assert self._events_shard_config.should_handle(
self._instance_name, event.room_id
)

if ratelimit:
# We check if this is a room admin redacting an event so that we
Expand Down
12 changes: 9 additions & 3 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,9 @@ async def create_room(

# Always wait for room creation to progate before returning
await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", last_stream_id
self.hs.config.worker.events_shard_config.get_instance(room_id),
"events",
last_stream_id,
)

return result, last_stream_id
Expand Down Expand Up @@ -1263,7 +1265,9 @@ async def shutdown_room(
#
# TODO: Currently the events stream is written to from master
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", stream_id
self.hs.config.worker.events_shard_config.get_instance(new_room_id),
"events",
stream_id,
)
else:
new_room_id = None
Expand Down Expand Up @@ -1293,7 +1297,9 @@ async def shutdown_room(

# Wait for leave to come in over replication before trying to forget.
await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", stream_id
self.hs.config.worker.events_shard_config.get_instance(room_id),
"events",
stream_id,
)

await self.room_member_handler.forget(target_requester.user, room_id)
Expand Down
7 changes: 0 additions & 7 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,6 @@ def __init__(self, hs: "HomeServer"):
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles

self._event_stream_writer_instance = hs.config.worker.writers.events
self._is_on_event_persistence_instance = (
self._event_stream_writer_instance == hs.get_instance_name()
)
if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence

self._join_rate_limiter_local = Ratelimiter(
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
Expand Down
12 changes: 9 additions & 3 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ def __init__(self, hs):
self.federation_handler = hs.get_handlers().federation_handler

@staticmethod
async def _serialize_payload(store, event_and_contexts, backfilled):
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
"""
Args:
store
room_id (str)
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
Expand All @@ -88,14 +89,19 @@ async def _serialize_payload(store, event_and_contexts, backfilled):
}
)

payload = {"events": event_payloads, "backfilled": backfilled}
payload = {
"events": event_payloads,
"backfilled": backfilled,
"room_id": room_id,
}

return payload

async def _handle_request(self, request):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)

room_id = content["room_id"]
backfilled = content["backfilled"]

event_payloads = content["events"]
Expand All @@ -120,7 +126,7 @@ async def _handle_request(self, request):
logger.info("Got %d events from federation", len(event_and_contexts))

max_stream_id = await self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled
room_id, event_and_contexts, backfilled
)

return 200, {"max_stream_id": max_stream_id}
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def __init__(self, hs):
if isinstance(stream, (EventsStream, BackfillStream)):
# Only add EventStream and BackfillStream as a source on the
# instance in charge of event persistence.
if hs.config.worker.writers.events == hs.get_instance_name():
if hs.get_instance_name() in hs.config.worker.writers.events:
self._streams_to_replicate.append(stream)

continue
Expand Down
Loading