-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Retry Experimental Federation Speedup #9908
Retry Experimental Federation Speedup #9908
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good. a couple of suggestions.
for event in events: | ||
events_by_room.setdefault(event.room_id, []).append(event) | ||
|
||
await make_deferred_yieldable( | ||
# concurrently process room events, to allow parallelization through db queries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# concurrently process room events, to allow parallelization through db queries | |
# concurrently process rooms, to allow parallelization through db queries |
nested_events_and_dests: List[ | ||
List[Tuple[EventBase, Collection[str]]] | ||
] = await make_deferred_yieldable( | ||
defer.gatherResults( | ||
[ | ||
run_in_background(handle_room_events, evs) | ||
run_in_background( | ||
get_federatable_events_and_destinations, evs | ||
) | ||
for evs in events_by_room.values() | ||
], | ||
consumeErrors=True, | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a concurrently_execute
in synapse.util.async_helpers
which takes care of the logcontext boilerplate whilst also applying a limit to the number of concurrent operations (which would probably be a good thing here). I think you may as well use it while you're changing things.
events_by_room: Dict[str, List[EventBase]] = {} | ||
for event in events: | ||
events_by_room.setdefault(event.room_id, []).append(event) | ||
|
||
await make_deferred_yieldable( | ||
# concurrently process room events, to allow parallelization through db queries | ||
nested_events_and_dests: List[ | ||
List[Tuple[EventBase, Collection[str]]] | ||
] = await make_deferred_yieldable( | ||
defer.gatherResults( | ||
[ | ||
run_in_background(handle_room_events, evs) | ||
run_in_background( | ||
get_federatable_events_and_destinations, evs | ||
) | ||
for evs in events_by_room.values() | ||
], | ||
consumeErrors=True, | ||
) | ||
) | ||
|
||
# flatten list | ||
events_and_dests = list( | ||
itertools.chain.from_iterable(nested_events_and_dests) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be worth moving all this into a function with input events
and output events_and_dests
, so as to limit the scope of the intermediate variables events_by_room
and nested_events_and_dests
.
Hi @ShadowJonathan, are you able to continue working on this? 🙂 |
I need to rebase and think about what these changes exactly entail, but i'll put it on draft until i got more time for it (i dont expect much in the upcoming months) I still want to work on it, though, just not right now |
Putting this in closed state after further discussion, ill reopen it once i have more time |
Fixes #9863
Re-introduces #9702, but adds parallelisation per #9863 (comment)
Pull Request Checklist
EventStore
toEventWorkerStore
.".code blocks
.Signed-off-by: Jonathan de Jong <[email protected]>