Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
apply actual change
Browse files Browse the repository at this point in the history
ShadowJonathan committed Apr 29, 2021
1 parent 27038b0 commit 9bedd51
Showing 1 changed file with 39 additions and 3 deletions.
42 changes: 39 additions & 3 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
@@ -13,17 +13,31 @@
# limitations under the License.

import abc
import itertools
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple, Collection
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Hashable,
Iterable,
List,
Optional,
Set,
Tuple,
)

from prometheus_client import Counter

from twisted.internet import defer

import synapse.metrics
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
@@ -352,7 +366,29 @@ async def get_federatable_events_and_destinations(
if dests
]

events_and_dests = await get_federatable_events_and_destinations(events)
events_by_room: Dict[str, List[EventBase]] = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)

# concurrently process room events, to allow parallelization through db queries
nested_events_and_dests: List[
List[Tuple[EventBase, Collection[str]]]
] = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
get_federatable_events_and_destinations, evs
)
for evs in events_by_room.values()
],
consumeErrors=True,
)
)

# flatten list
events_and_dests = list(
itertools.chain.from_iterable(nested_events_and_dests)
)

# Send corresponding events to each destination queue
await self._distribute_events(events_and_dests)
@@ -373,7 +409,7 @@ async def get_federatable_events_and_destinations(
events_processed_counter.inc(len(events))

event_processing_loop_room_count.labels("federation_sender").inc(
len({event.room_id for event in events})
len(events_by_room)
)

event_processing_loop_counter.labels("federation_sender").inc()

0 comments on commit 9bedd51

Please sign in to comment.