diff --git a/Cargo.lock b/Cargo.lock index 8a8099bc6d98..2223ffdab7df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -323,18 +323,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "serde" -version = "1.0.147" +version = "1.0.148" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d193d69bae983fc11a79df82342761dfbf28a99fc8d203dca4c3c1b590948965" +checksum = "e53f64bb4ba0191d6d0676e1b141ca55047d83b74f5607e6d8eb88126c52c2dc" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.147" +version = "1.0.148" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f1d362ca8fc9c3e3a7484440752472d68a6caa98f1ab81d99b5dfe517cec852" +checksum = "a55492425aa53521babf6137309e7d34c20bbfbbfcfe2c7f3a047fd1f6b92c0c" dependencies = [ "proc-macro2", "quote", @@ -366,9 +366,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.102" +version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fcd952facd492f9be3ef0d0b7032a6e442ee9b361d4acc2b1d0c4aaa5f613a1" +checksum = "4ae548ec36cf198c0ef7710d3c230987c2d6d7bd98ad6edc0274462724c585ce" dependencies = [ "proc-macro2", "quote", diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 60774b240d9f..11c5b10c9546 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import time import urllib.parse from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Optional, Tuple @@ -324,6 +325,10 @@ async def push_bulk( "left": list(device_list_summary.left), } + if len(serialized_events) == 0 and len(ephemeral) == 0: + logger.info("Returning early on transaction: no events to send") + return True + try: await self.put_json( uri=uri, @@ -365,7 +370,19 @@ async def push_bulk( def _serialize( self, service: "ApplicationService", events: Iterable[EventBase] ) -> List[JsonDict]: + new_events = [] time_now = self.clock.time_msec() + + for event in events: + if int(round(time.time() * 1000)) - event.origin_server_ts > (15 * 60 * 1000): + logger.warning("Dropping event (due to age) %s" % event.event_id) + continue + if service.id != "github" and service.is_interested_in_user(event.sender) and event.sender.endswith(":t2bot.io"): + logger.warning("Dropping event (due to echo) %s" % event.event_id) + continue + logger.info("Allowing @ fallback: %s" % event.event_id) + new_events.append(event) + return [ serialize_event( e, @@ -384,5 +401,5 @@ def _serialize( ), ), ) - for e in events + for e in new_events ] diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c4c0bc7315b4..2402bbbc9ba5 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -369,7 +369,7 @@ async def get_pdu( destinations: Collection[str], event_id: str, room_version: RoomVersion, - timeout: Optional[int] = None, + timeout: Optional[int] = 15000, ) -> Optional[PulledPduInfo]: """Requests the PDU with given origin and ID from the remote home servers. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bb20af6e91ed..1b4cfc89d9b0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -87,7 +87,7 @@ # when processing incoming transactions, we try to handle multiple rooms in # parallel, up to this limit. -TRANSACTION_CONCURRENCY_LIMIT = 10 +TRANSACTION_CONCURRENCY_LIMIT = 50 # T2B: Raise from 10 logger = logging.getLogger(__name__) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 3ad483efe079..a58cca4f826f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -705,6 +705,11 @@ def send_presence_to_destinations( for destination in destinations: if destination == self.server_name: continue + + # T2B: Skip sending presence to servers we know don't support it + if destination == "matrix.org": + continue + if not self._federation_shard_config.should_handle( self._instance_name, destination ): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index cd39d4d1113a..74d6897aa8ea 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -106,7 +106,7 @@ async def get_room_state( ) async def get_event( - self, destination: str, event_id: str, timeout: Optional[int] = None + self, destination: str, event_id: str, timeout: Optional[int] = 15000 ) -> JsonDict: """Requests the pdu with give id and origin from the given server. diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 205fd16daa98..4e2d1b2331fa 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -100,14 +100,32 @@ async def on_PUT( logger.debug("Decoded %s: %s", transaction_id, str(transaction_data)) + edus_before_filter = len(transaction_data.get("edus", [])) + + filtered_edus = [] + for edu in transaction_data.get("edus", []): + edu_type = edu.get('edu_type', 'io.t2bot.ignored') + if edu_type == 'io.t2bot.ignored': + continue + if edu_type == 'm.presence': + continue + if edu_type == 'm.receipt': + continue + if edu_type == 'm.typing': + continue + filtered_edus.append(edu) + logger.info( - "Received txn %s from %s. (PDUs: %d, EDUs: %d)", + "Received txn %s from %s. (PDUs: %d, Accepted EDUs: %d, Ignored EDUs: %d)", transaction_id, origin, len(transaction_data.get("pdus", [])), - len(transaction_data.get("edus", [])), + len(filtered_edus), + edus_before_filter - len(filtered_edus), ) + transaction_data["edus"] = filtered_edus + if issue_8631_logger.isEnabledFor(logging.DEBUG): DEVICE_UPDATE_EDUS = [ EduTypes.DEVICE_LIST_UPDATE, diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index f7223b03c364..b8c9080f447c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -2151,7 +2151,7 @@ async def _run_push_actions_and_persist_event( # persist_events_and_notify directly.) assert not event.internal_metadata.outlier - if not backfilled and not context.rejected: + if False and not backfilled and not context.rejected: min_depth = await self._store.get_min_depth(event.room_id) if min_depth is None or min_depth > event.depth: # XXX richvdh 2021/10/07: I don't really understand what this diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4cf593cfdcbc..ff4f1c1b2a0b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1462,9 +1462,10 @@ async def _persist_events( a room that has been un-partial stated. """ - await self._bulk_push_rule_evaluator.action_for_events_by_user( - events_and_context - ) + # T2B: Disable push processing. + #await self._bulk_push_rule_evaluator.action_for_events_by_user( + # events_and_context + #) try: # If we're a worker we need to hit out to the master. diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 3610b6bf785e..bdda5e2f0f6e 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -70,7 +70,8 @@ def __init__(self, hs: "HomeServer"): # Guard to ensure we only process deltas one at a time self._is_processing = False - if self.update_user_directory: + # T2B: Disable user directory + if self.update_user_directory and False: self.notifier.add_replication_callback(self.notify_new_event) # We kick this off so that we don't have to wait for a change before @@ -109,6 +110,11 @@ async def search_users( def notify_new_event(self) -> None: """Called when there may be more deltas to process""" + + # T2B: Disable user directory + if True: + return + if not self.update_user_directory: return @@ -133,6 +139,10 @@ async def handle_local_profile_change( # FIXME(#3714): We should probably do this in the same worker as all # the other changes. + # T2B: Disable user directory + if True: + return + if await self.store.should_include_local_user_in_dir(user_id): await self.store.update_profile_in_user_dir( user_id, profile.display_name, profile.avatar_url @@ -142,6 +152,11 @@ async def handle_local_user_deactivated(self, user_id: str) -> None: """Called when a user ID is deactivated""" # FIXME(#3714): We should probably do this in the same worker as all # the other changes. + + # T2B: Disable user directory + if True: + return + await self.store.remove_from_user_dir(user_id) async def _unsafe_process(self) -> None: