Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove remaining usage of cursor_to_dict. #16564

Merged
merged 12 commits into from
Oct 31, 2023
2 changes: 1 addition & 1 deletion synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->
start, limit, user_id
)
for media in media_ids:
writer.write_media_id(media["media_id"], media)
writer.write_media_id(media.media_id, attr.asdict(media))

logger.info(
"[%s] Written %d media_ids of %s",
Expand Down
43 changes: 21 additions & 22 deletions synapse/handlers/room_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
RequestSendFailed,
SynapseError,
)
from synapse.storage.databases.main.room import LargestRoomStats
from synapse.types import JsonDict, JsonMapping, ThirdPartyInstanceID
from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.caches.response_cache import ResponseCache
Expand Down Expand Up @@ -170,26 +171,24 @@ async def _get_public_room_list(
ignore_non_federatable=from_federation,
)

def build_room_entry(room: JsonDict) -> JsonDict:
def build_room_entry(room: LargestRoomStats) -> JsonDict:
entry = {
"room_id": room["room_id"],
"name": room["name"],
"topic": room["topic"],
"canonical_alias": room["canonical_alias"],
"num_joined_members": room["joined_members"],
"avatar_url": room["avatar"],
"world_readable": room["history_visibility"]
"room_id": room.room_id,
"name": room.name,
"topic": room.topic,
"canonical_alias": room.canonical_alias,
"num_joined_members": room.joined_members,
"avatar_url": room.avatar,
"world_readable": room.history_visibility
== HistoryVisibility.WORLD_READABLE,
"guest_can_join": room["guest_access"] == "can_join",
"join_rule": room["join_rules"],
"room_type": room["room_type"],
"guest_can_join": room.guest_access == "can_join",
"join_rule": room.join_rules,
"room_type": room.room_type,
}

# Filter out Nones – rather omit the field altogether
return {k: v for k, v in entry.items() if v is not None}

results = [build_room_entry(r) for r in results]

response: JsonDict = {}
num_results = len(results)
if limit is not None:
Expand All @@ -212,33 +211,33 @@ def build_room_entry(room: JsonDict) -> JsonDict:
# If there was a token given then we assume that there
# must be previous results.
response["prev_batch"] = RoomListNextBatch(
last_joined_members=initial_entry["num_joined_members"],
last_room_id=initial_entry["room_id"],
last_joined_members=initial_entry.joined_members,
last_room_id=initial_entry.room_id,
direction_is_forward=False,
).to_token()

if more_to_come:
response["next_batch"] = RoomListNextBatch(
last_joined_members=final_entry["num_joined_members"],
last_room_id=final_entry["room_id"],
last_joined_members=final_entry.joined_members,
last_room_id=final_entry.room_id,
direction_is_forward=True,
).to_token()
else:
if has_batch_token:
response["next_batch"] = RoomListNextBatch(
last_joined_members=final_entry["num_joined_members"],
last_room_id=final_entry["room_id"],
last_joined_members=final_entry.joined_members,
last_room_id=final_entry.room_id,
direction_is_forward=True,
).to_token()

if more_to_come:
response["prev_batch"] = RoomListNextBatch(
last_joined_members=initial_entry["num_joined_members"],
last_room_id=initial_entry["room_id"],
last_joined_members=initial_entry.joined_members,
last_room_id=initial_entry.room_id,
direction_is_forward=False,
).to_token()

response["chunk"] = results
response["chunk"] = [build_room_entry(r) for r in results]

response["total_room_count_estimate"] = await self.store.count_public_rooms(
network_tuple,
Expand Down
26 changes: 13 additions & 13 deletions synapse/handlers/room_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,24 +703,24 @@ async def _build_room_entry(self, room_id: str, for_federation: bool) -> JsonDic
# there should always be an entry
assert stats is not None, "unable to retrieve stats for %s" % (room_id,)

entry = {
"room_id": stats["room_id"],
"name": stats["name"],
"topic": stats["topic"],
"canonical_alias": stats["canonical_alias"],
"num_joined_members": stats["joined_members"],
"avatar_url": stats["avatar"],
"join_rule": stats["join_rules"],
entry: JsonDict = {
"room_id": stats.room_id,
"name": stats.name,
"topic": stats.topic,
"canonical_alias": stats.canonical_alias,
"num_joined_members": stats.joined_members,
"avatar_url": stats.avatar,
"join_rule": stats.join_rules,
"world_readable": (
stats["history_visibility"] == HistoryVisibility.WORLD_READABLE
stats.history_visibility == HistoryVisibility.WORLD_READABLE
),
"guest_can_join": stats["guest_access"] == "can_join",
"room_type": stats["room_type"],
"guest_can_join": stats.guest_access == "can_join",
"room_type": stats.room_type,
}

if self._msc3266_enabled:
entry["im.nheko.summary.version"] = stats["version"]
entry["im.nheko.summary.encryption"] = stats["encryption"]
entry["im.nheko.summary.version"] = stats.version
entry["im.nheko.summary.encryption"] = stats.encryption

# Federation requests need to provide additional information so the
# requested server is able to filter the response appropriately.
Expand Down
6 changes: 4 additions & 2 deletions synapse/rest/admin/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from http import HTTPStatus
from typing import TYPE_CHECKING, Optional, Tuple

import attr

from synapse.api.constants import Direction
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.server import HttpServer
Expand Down Expand Up @@ -418,7 +420,7 @@ async def on_GET(
start, limit, user_id, order_by, direction
)

ret = {"media": media, "total": total}
ret = {"media": [attr.asdict(m) for m in media], "total": total}
if (start + limit) < total:
ret["next_token"] = start + len(media)

Expand Down Expand Up @@ -477,7 +479,7 @@ async def on_DELETE(
)

deleted_media, total = await self.media_repository.delete_local_media_ids(
[row["media_id"] for row in media]
[m.media_id for m in media]
)

return HTTPStatus.OK, {"deleted_media": deleted_media, "total": total}
Expand Down
13 changes: 12 additions & 1 deletion synapse/rest/admin/registration_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,18 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
valid = parse_boolean(request, "valid")
token_list = await self.store.get_registration_tokens(valid)
return HTTPStatus.OK, {"registration_tokens": token_list}
return HTTPStatus.OK, {
"registration_tokens": [
{
"token": t[0],
"uses_allowed": t[1],
"pending": t[2],
"completed": t[3],
"expiry_time": t[4],
}
for t in token_list
]
}


class NewRegistrationTokenRestServlet(RestServlet):
Expand Down
11 changes: 8 additions & 3 deletions synapse/rest/admin/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from typing import TYPE_CHECKING, List, Optional, Tuple, cast
from urllib import parse as urlparse

import attr

from synapse.api.constants import Direction, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.api.filtering import Filter
Expand Down Expand Up @@ -306,10 +308,13 @@ async def on_GET(
raise NotFoundError("Room not found")

members = await self.store.get_users_in_room(room_id)
ret["joined_local_devices"] = await self.store.count_devices_by_users(members)
ret["forgotten"] = await self.store.is_locally_forgotten_room(room_id)
result = attr.asdict(ret)
result["joined_local_devices"] = await self.store.count_devices_by_users(
members
)
result["forgotten"] = await self.store.is_locally_forgotten_room(room_id)
clokep marked this conversation as resolved.
Show resolved Hide resolved

return HTTPStatus.OK, ret
return HTTPStatus.OK, result

async def on_DELETE(
self, request: SynapseRequest, room_id: str
Expand Down
47 changes: 33 additions & 14 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1620,52 +1620,71 @@ async def _remove_duplicate_outbound_pokes(
#
# For each duplicate, we delete all the existing rows and put one back.

KEY_COLS = ["stream_id", "destination", "user_id", "device_id"]
last_row = progress.get(
"last_row",
{"stream_id": 0, "destination": "", "user_id": "", "device_id": ""},
)

def _txn(txn: LoggingTransaction) -> int:
clause, args = make_tuple_comparison_clause(
[(x, last_row[x]) for x in KEY_COLS]
[
("stream_id", last_row["stream_id"]),
("destination", last_row["destination"]),
("user_id", last_row["user_id"]),
("device_id", last_row["device_id"]),
]
)
sql = """
SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts
FROM device_lists_outbound_pokes
WHERE %s
GROUP BY %s
GROUP BY stream_id, destination, user_id, device_id
HAVING count(*) > 1
ORDER BY %s
ORDER BY stream_id, destination, user_id, device_id
LIMIT ?
""" % (
clause, # WHERE
clokep marked this conversation as resolved.
Show resolved Hide resolved
",".join(KEY_COLS), # GROUP BY
",".join(KEY_COLS), # ORDER BY
)
txn.execute(sql, args + [batch_size])
rows = self.db_pool.cursor_to_dict(txn)
rows = txn.fetchall()

row = None
for row in rows:
stream_id, destination, user_id, device_id = None, None, None, None
for stream_id, destination, user_id, device_id, _ in rows:
self.db_pool.simple_delete_txn(
txn,
"device_lists_outbound_pokes",
{x: row[x] for x in KEY_COLS},
{
"stream_id": stream_id,
"destination": destination,
"user_id": user_id,
"device_id": device_id,
},
)

row["sent"] = False
self.db_pool.simple_insert_txn(
txn,
"device_lists_outbound_pokes",
row,
{
"stream_id": stream_id,
"destination": destination,
"user_id": user_id,
"device_id": device_id,
"sent": False,
},
)

if row:
if rows:
self.db_pool.updates._background_update_progress_txn(
txn,
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES,
{"last_row": row},
{
"last_row": {
"stream_id": stream_id,
"destination": destination,
"user_id": user_id,
"device_id": device_id,
}
},
)

return len(rows)
Expand Down
48 changes: 37 additions & 11 deletions synapse/storage/databases/main/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
cast,
)

import attr

from synapse.api.constants import Direction
from synapse.logging.opentracing import trace
from synapse.media._base import ThumbnailInfo
Expand All @@ -45,6 +47,18 @@
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class LocalMedia:
media_id: str
media_type: str
media_length: int
upload_name: str
created_ts: int
last_access_ts: int
quarantined_by: Optional[str]
safe_from_quarantine: bool


class MediaSortOrder(Enum):
"""
Enum to define the sorting method used when returning media with
Expand Down Expand Up @@ -180,7 +194,7 @@ async def get_local_media_by_user_paginate(
user_id: str,
order_by: str = MediaSortOrder.CREATED_TS.value,
direction: Direction = Direction.FORWARDS,
) -> Tuple[List[Dict[str, Any]], int]:
) -> Tuple[List[LocalMedia], int]:
"""Get a paginated list of metadata for a local piece of media
which an user_id has uploaded

Expand All @@ -197,7 +211,7 @@ async def get_local_media_by_user_paginate(

def get_local_media_by_user_paginate_txn(
txn: LoggingTransaction,
) -> Tuple[List[Dict[str, Any]], int]:
) -> Tuple[List[LocalMedia], int]:
# Set ordering
order_by_column = MediaSortOrder(order_by).value

Expand All @@ -217,14 +231,14 @@ def get_local_media_by_user_paginate_txn(

sql = """
SELECT
"media_id",
"media_type",
"media_length",
"upload_name",
"created_ts",
"last_access_ts",
"quarantined_by",
"safe_from_quarantine"
media_id,
media_type,
media_length,
upload_name,
created_ts,
last_access_ts,
quarantined_by,
safe_from_quarantine
FROM local_media_repository
WHERE user_id = ?
ORDER BY {order_by_column} {order}, media_id ASC
Expand All @@ -236,7 +250,19 @@ def get_local_media_by_user_paginate_txn(

args += [limit, start]
txn.execute(sql, args)
media = self.db_pool.cursor_to_dict(txn)
media = [
LocalMedia(
media_id=row[0],
media_type=row[1],
media_length=row[2],
upload_name=row[3],
created_ts=row[4],
last_access_ts=row[5],
quarantined_by=row[6],
safe_from_quarantine=bool(row[7]),
)
for row in txn
]
return media, count

return await self.db_pool.runInteraction(
Expand Down
Loading