Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add DB bg update to cleanup extremities. #5278

Merged
merged 8 commits into from
May 30, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5278.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.
2 changes: 2 additions & 0 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from .event_federation import EventFederationStore
from .event_push_actions import EventPushActionsStore
from .events import EventsStore
from .events_bg_updates import EventsBackgroundUpdatesStore
from .filtering import FilteringStore
from .group_server import GroupServerStore
from .keys import KeyStore
Expand Down Expand Up @@ -66,6 +67,7 @@


class DataStore(
EventsBackgroundUpdatesStore,
RoomMemberStore,
RoomStore,
RegistrationStore,
Expand Down
12 changes: 9 additions & 3 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,8 @@ def _simple_delete_txn(txn, table, keyvalues):
" AND ".join("%s = ?" % (k,) for k in keyvalues),
)

return txn.execute(sql, list(keyvalues.values()))
txn.execute(sql, list(keyvalues.values()))
return txn.rowcount

def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
return self.runInteraction(
Expand All @@ -1280,9 +1281,12 @@ def _simple_delete_many_txn(txn, table, column, iterable, keyvalues):
column : column name to test for inclusion against `iterable`
iterable : list
keyvalues : dict of column names and values to select the rows with

Returns:
int: Number rows deleted
"""
if not iterable:
return
return 0

sql = "DELETE FROM %s" % table

Expand All @@ -1297,7 +1301,9 @@ def _simple_delete_many_txn(txn, table, column, iterable, keyvalues):

if clauses:
sql = "%s WHERE %s" % (sql, " AND ".join(clauses))
return txn.execute(sql, values)
txn.execute(sql, values)

return txn.rowcount

def _get_cache_dict(
self, db_conn, table, entity_column, stream_column, max_value, limit=100000
Expand Down
180 changes: 2 additions & 178 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2018-2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -219,41 +220,11 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"

def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
self.register_background_update_handler(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
self._background_reindex_fields_sender,
)

self.register_background_index_update(
"event_contains_url_index",
index_name="event_contains_url_index",
table="events",
columns=["room_id", "topological_ordering", "stream_ordering"],
where_clause="contains_url = true AND outlier = false",
)

# an event_id index on event_search is useful for the purge_history
# api. Plus it means we get to enforce some integrity with a UNIQUE
# clause
self.register_background_index_update(
"event_search_event_id_idx",
index_name="event_search_event_id_idx",
table="event_search",
columns=["event_id"],
unique=True,
psql_only=True,
)

self._event_persist_queue = _EventPeristenceQueue()

self._state_resolution_handler = hs.get_state_resolution_handler()

@defer.inlineCallbacks
Expand Down Expand Up @@ -1579,153 +1550,6 @@ def _count(txn):
ret = yield self.runInteraction("count_daily_active_rooms", _count)
defer.returnValue(ret)

@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)

INSERT_CLUMP_SIZE = 1000

def reindex_txn(txn):
sql = (
"SELECT stream_ordering, event_id, json FROM events"
" INNER JOIN event_json USING (event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
)

txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))

rows = txn.fetchall()
if not rows:
return 0

min_stream_id = rows[-1][0]

update_rows = []
for row in rows:
try:
event_id = row[1]
event_json = json.loads(row[2])
sender = event_json["sender"]
content = event_json["content"]

contains_url = "url" in content
if contains_url:
contains_url &= isinstance(content["url"], text_type)
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue

update_rows.append((sender, contains_url, event_id))

sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"

for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows),
}

self._background_update_progress_txn(
txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
)

return len(rows)

result = yield self.runInteraction(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
)

if not result:
yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)

defer.returnValue(result)

@defer.inlineCallbacks
def _background_reindex_origin_server_ts(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)

INSERT_CLUMP_SIZE = 1000

def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
)

txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))

rows = txn.fetchall()
if not rows:
return 0

min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]

rows_to_update = []

chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
for chunk in chunks:
ev_rows = self._simple_select_many_txn(
txn,
table="event_json",
column="event_id",
iterable=chunk,
retcols=["event_id", "json"],
keyvalues={},
)

for row in ev_rows:
event_id = row["event_id"]
event_json = json.loads(row["json"])
try:
origin_server_ts = event_json["origin_server_ts"]
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue

rows_to_update.append((origin_server_ts, event_id))

sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"

for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows_to_update),
}

self._background_update_progress_txn(
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
)

return len(rows_to_update)

result = yield self.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
)

if not result:
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)

defer.returnValue(result)

def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token()
Expand Down
Loading