Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix the inbound PDU metric
Browse files Browse the repository at this point in the history
This broke in #10272
  • Loading branch information
erikjohnston committed Jun 30, 2021
1 parent d561367 commit 7f96eff
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 27 deletions.
37 changes: 20 additions & 17 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,22 +368,21 @@ async def process_pdus_for_room(room_id: str):

async def process_pdu(pdu: EventBase) -> JsonDict:
event_id = pdu.event_id
with pdu_process_time.time():
with nested_logging_context(event_id):
try:
await self._handle_received_pdu(origin, pdu)
return {}
except FederationError as e:
logger.warning("Error handling PDU %s: %s", event_id, e)
return {"error": str(e)}
except Exception as e:
f = failure.Failure()
logger.error(
"Failed to handle PDU %s",
event_id,
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)
return {"error": str(e)}
with nested_logging_context(event_id):
try:
await self._handle_received_pdu(origin, pdu)
return {}
except FederationError as e:
logger.warning("Error handling PDU %s: %s", event_id, e)
return {"error": str(e)}
except Exception as e:
f = failure.Failure()
logger.error(
"Failed to handle PDU %s",
event_id,
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)
return {"error": str(e)}

await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
Expand Down Expand Up @@ -909,9 +908,13 @@ async def _process_incoming_pdus_in_room_inner(
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)

await self.store.remove_received_event_from_staging(
received_ts = await self.store.remove_received_event_from_staging(
origin, event.event_id
)
if received_ts is not None:
pdu_process_time.observe(
(self._clock.time_msec() - received_ts) / 1000
)

# We need to do this check outside the lock to avoid a race between
# a new event being inserted by another instance and it attempting
Expand Down
66 changes: 56 additions & 10 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,16 +1075,62 @@ async def remove_received_event_from_staging(
self,
origin: str,
event_id: str,
) -> None:
"""Remove the given event from the staging area"""
await self.db_pool.simple_delete(
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
desc="remove_received_event_from_staging",
)
) -> Optional[int]:
"""Remove the given event from the staging area.
Returns:
The received_ts of the row that was deleted, if any.
"""
if self.db_pool.engine.supports_returning:

def _remove_received_event_from_staging_txn(txn):
sql = """
DELETE FROM federation_inbound_events_staging
WHERE origin = ? AND event_id = ?
RETURNING received_ts
"""

txn.execute(sql, (origin, event_id))
return txn.fetchone()

row = await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
db_autocommit=True,
)
if row is None:
return None

return row[0]

else:

def _remove_received_event_from_staging_txn(txn):
received_ts = self.db_pool.simple_select_one_onecol_txn(
txn,
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
retcol="received_ts",
allow_none=True,
)
self.db_pool.simple_delete_txn(
txn,
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
)

return received_ts

return await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
)

async def get_next_staged_event_id_for_room(
self,
Expand Down
6 changes: 6 additions & 0 deletions synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ def supports_using_any_list(self) -> bool:
"""
...

@property
@abc.abstractmethod
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
...

@abc.abstractmethod
def check_database(
self, db_conn: ConnectionType, allow_outdated_version: bool = False
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ def supports_using_any_list(self):
"""Do we support using `a = ANY(?)` and passing a list"""
return True

@property
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
return True

def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/engines/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ def supports_using_any_list(self):
"""Do we support using `a = ANY(?)` and passing a list"""
return False

@property
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
return self.module.sqlite_version_info >= (3, 35, 0)

def check_database(self, db_conn, allow_outdated_version: bool = False):
if not allow_outdated_version:
version = self.module.sqlite_version_info
Expand Down

0 comments on commit 7f96eff

Please sign in to comment.