Skip to content

Commit

Permalink
Do deletion in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
richvdh committed Nov 18, 2024
1 parent 0aa5a39 commit e58c238
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 31 deletions.
25 changes: 18 additions & 7 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -1590,24 +1590,35 @@ async def _delete_old_one_time_keys_task(
that it could still have old OTKs that the client has dropped. This task is scheduled exactly once
by a database schema delta file, and it clears out old one-time-keys that look like they came from libolm.
"""
user = task.result.get("from_user", "") if task.result else ""
last_user = task.result.get("from_user", "") if task.result else ""
while True:
user, rowcount = await self.store.delete_old_otks_for_one_user(user)
if user is None:
# We process users in batches of 100
users, rowcount = await self.store.delete_old_otks_for_next_user_batch(
last_user, 100
)
if len(users) == 0:
# We're done!
return TaskStatus.COMPLETE, None, None

logger.debug("Deleted %i old one-time-keys for user '%s'", rowcount, user)
logger.debug(
"Deleted %i old one-time-keys for users '%s'..'%s'",
rowcount,
users[0],
users[-1],
)
last_user = users[-1]

# Store our progress
await self._task_scheduler.update_task(task.id, result={"from_user": user})
await self._task_scheduler.update_task(
task.id, result={"from_user": last_user}
)

# Sleep a little before doing the next user.
#
# matrix.org has about 15M users in the e2e_one_time_keys_json table
# (comprising 20M devices). We want this to take about a week, so we need
# to do 25 per second.
await self.clock.sleep(0.04)
# to do about one batch of 100 users every 4 seconds.
await self.clock.sleep(4)


def _check_cross_signing_key(
Expand Down
49 changes: 25 additions & 24 deletions synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -1453,48 +1453,49 @@ def impl(txn: LoggingTransaction) -> Tuple[bool, Optional[int]]:
impl,
)

async def delete_old_otks_for_one_user(
self, after_user_id: str
) -> Tuple[Optional[str], int]:
"""Deletes old OTKs belonging to one user.
async def delete_old_otks_for_next_user_batch(
self, after_user_id: str, number_of_users: int
) -> Tuple[List[str], int]:
"""Deletes old OTKs belonging to the next batch of users
Returns:
`(user, rows)`, where:
* `user` is the user ID of the updated user, or None if we are don
`(users, rows)`, where:
* `users` is the user IDs of the updated users. An empty list if we are done.
* `rows` is the number of deleted rows
"""

def impl(txn: LoggingTransaction) -> Tuple[Optional[str], int]:
# Find the next user
def impl(txn: LoggingTransaction) -> Tuple[List[str], int]:
# Find a batch of users
txn.execute(
"""
SELECT user_id FROM e2e_one_time_keys_json WHERE user_id > ? LIMIT 1
SELECT DISTINCT(user_id) FROM e2e_one_time_keys_json
WHERE user_id > ?
ORDER BY user_id
LIMIT ?
""",
(after_user_id,),
(after_user_id, number_of_users),
)
row = txn.fetchone()
if not row:
# We're done!
return None, 0
(user_id,) = row
users = [row[0] for row in txn.fetchall()]
if len(users) == 0:
return users, 0

# Delete any old OTKs belonging to that user.
# Delete any old OTKs belonging to those users.
#
# We only actually consider OTKs whose key ID is 6 characters long. These
# keys were likely made by libolm rather than Vodozemac; libolm only kept
# 100 private OTKs, so was far more vulnerable than Vodozemac to throwing
# away keys prematurely.
txn.execute(
"""
clause, args = make_in_list_sql_clause(txn.database_engine, 'user_id', users)
sql = f"""
DELETE FROM e2e_one_time_keys_json
WHERE user_id = ? AND ts_added_ms < ? AND length(key_id) = 6
""",
(user_id, self._clock.time_msec() - (7 * 24 * 3600 * 1000)),
)
WHERE {clause} AND ts_added_ms < ? AND length(key_id) = 6
"""
args.append(self._clock.time_msec() - (7 * 24 * 3600 * 1000))
txn.execute(sql, args)

return user_id, txn.rowcount
return users, txn.rowcount

return await self.db_pool.runInteraction("delete_old_otks_for_one_user", impl)
return await self.db_pool.runInteraction("delete_old_otks_for_next_user_batch", impl)


class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
Expand Down

0 comments on commit e58c238

Please sign in to comment.