Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Improve the efficiency of _get_events_from_store_or_dest
Browse files Browse the repository at this point in the history
... by persisting any fetched events at the same time.
  • Loading branch information
richvdh committed Dec 13, 2019
1 parent 29edeea commit 44bd0fc
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 56 deletions.
107 changes: 53 additions & 54 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.types import UserID, get_domain_from_id
from synapse.util import batch_iter
from synapse.util.async_helpers import Linearizer
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
Expand Down Expand Up @@ -618,7 +618,7 @@ async def _get_events_from_store_or_dest(
) -> Dict[str, EventBase]:
"""Fetch events from a remote destination, checking if we already have them.
Persists any events we don't already have.
Persists any events we don't already have as outliers.
If we fail to fetch any of the events, a warning will be logged, and the event
will be omitted from the result. Likewise, any events which turn out not to
Expand All @@ -638,33 +638,15 @@ async def _get_events_from_store_or_dest(
room_id,
)

def err_cb(f, e_id):
logger.warning(
"Error fetching missing state/auth event %s: %s",
e_id,
f.getErrorMessage(),
)

for batch in batch_iter(missing_events, 5):
deferreds = [
run_in_background(
self._get_event_and_persist,
destination=destination,
room_id=room_id,
event_id=e_id,
).addErrback(err_cb, e_id)
for e_id in batch
]

await make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
await self._get_events_and_persist(
destination=destination, room_id=room_id, events=missing_events
)

# we need to make sure we re-load from the database to get the rejected
# state correct.
fetched_events.update(
(await self.store.get_events(batch, allow_rejected=True))
)
# we need to make sure we re-load from the database to get the rejected
# state correct.
fetched_events.update(
(await self.store.get_events(missing_events, allow_rejected=True))
)

# check for events which were in the wrong room.
#
Expand Down Expand Up @@ -1072,38 +1054,55 @@ async def try_backfill(domains):

return False

async def _get_event_and_persist(
self, destination: str, room_id: str, event_id: str
async def _get_events_and_persist(
self, destination: str, room_id: str, events: Iterable[str]
):
"""Fetch the given event from a server, and persist it as an outlier.
"""Fetch the given events from a server, and persist them as outliers.
Raises:
Exception: if we couldn't find the event
Logs a warning if we can't find the given event.
"""
with nested_logging_context(event_id):
room_version = await self.store.get_room_version(room_id)

event = await self.federation_client.get_pdu(
[destination], event_id, room_version, outlier=True,
) # type: Optional[EventBase]
room_version = await self.store.get_room_version(room_id)

if event is None:
raise Exception(
"Server %s didn't return event %s" % (destination, event_id,)
)
event_infos = []

auth_events = await self._get_events_from_store_or_dest(
destination, room_id, event.auth_event_ids()
)
auth = {}
for auth_event_id in event.auth_event_ids():
e = auth_events.get(auth_event_id)
if e:
auth[(e.type, e.state_key)] = e

await self._handle_new_event(
destination, event, state=None, auth_events=auth,
)
async def get_event(event_id: str):
with nested_logging_context(event_id):
try:
event = await self.federation_client.get_pdu(
[destination], event_id, room_version, outlier=True,
)
if event is None:
logger.warning(
"Server %s didn't return event %s", destination, event_id,
)
return

# recursively fetch the auth events for this event
auth_events = await self._get_events_from_store_or_dest(
destination, room_id, event.auth_event_ids()
)
auth = {}
for auth_event_id in event.auth_event_ids():
ae = auth_events.get(auth_event_id)
if ae:
auth[(ae.type, ae.state_key)] = ae

event_infos.append(_NewEventInfo(event, None, auth))

except Exception as e:
logger.warning(
"Error fetching missing state/auth event %s: %s %s",
event_id,
type(e),
e,
)

await concurrently_execute(get_event, events, 5)

await self._handle_new_events(
destination, event_infos,
)

def _sanity_check_event(self, ev):
"""
Expand Down
4 changes: 2 additions & 2 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ def concurrently_execute(func, args, limit):
Args:
func (func): Function to execute, should return a deferred or coroutine.
args (list): List of arguments to pass to func, each invocation of func
gets a signle argument.
args (Iterable): List of arguments to pass to func, each invocation of func
gets a single argument.
limit (int): Maximum number of conccurent executions.
Returns:
Expand Down

0 comments on commit 44bd0fc

Please sign in to comment.