Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Delete unreferenced state groups during purge #4006

Merged
merged 12 commits into from
Oct 30, 2018
1 change: 1 addition & 0 deletions changelog.d/4006.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Delete unreferenced state groups during history purge
157 changes: 110 additions & 47 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import batch_iter
from synapse.util.async_helpers import ObservableDeferred
Expand Down Expand Up @@ -205,7 +206,8 @@ def f(self, *args, **kwargs):

# inherits from EventFederationStore so that we can call _update_backward_extremities
# and _handle_mult_prev_events (though arguably those could both be moved in here)
class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment on why we are inheriting from StateGroupWorkerStore mightn't hurt; otoh perhaps it's obvious enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its obvious enough, especially since we use it in a fair few places and not just in purge history

BackgroundUpdateStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"

Expand Down Expand Up @@ -2034,55 +2036,37 @@ def _purge_history_txn(

logger.info("[purge] finding redundant state groups")

# Get all state groups that are only referenced by events that are
# to be deleted.
# This works by first getting state groups that we may want to delete,
# joining against event_to_state_groups to get events that use that
# state group, then left joining against events_to_purge again. Any
# state group where the left join produce *no nulls* are referenced
# only by events that are going to be purged.
# Get all state groups that are referenced by events that are to be
# deleted. We then go and check if they are referenced by other events
# or state groups, and if not we delete them.
txn.execute("""
SELECT state_group FROM
(
SELECT DISTINCT state_group FROM events_to_purge
INNER JOIN event_to_state_groups USING (event_id)
) AS sp
INNER JOIN event_to_state_groups USING (state_group)
LEFT JOIN events_to_purge AS ep USING (event_id)
GROUP BY state_group
HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
SELECT DISTINCT state_group FROM events_to_purge
INNER JOIN event_to_state_groups USING (event_id)
""")

state_rows = txn.fetchall()
logger.info("[purge] found %i redundant state groups", len(state_rows))

# make a set of the redundant state groups, so that we can look them up
# efficiently
state_groups_to_delete = set([sg for sg, in state_rows])

# Now we get all the state groups that rely on these state groups
logger.info("[purge] finding state groups which depend on redundant"
" state groups")
remaining_state_groups = []
for i in range(0, len(state_rows), 100):
chunk = [sg for sg, in state_rows[i:i + 100]]
# look for state groups whose prev_state_group is one we are about
# to delete
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=chunk,
retcols=["state_group"],
keyvalues={},
)
remaining_state_groups.extend(
row["state_group"] for row in rows
referenced_state_groups = set(sg for sg, in txn)
logger.info(
"[purge] found %i referenced state groups",
len(referenced_state_groups),
)

# exclude state groups we are about to delete: no point in
# updating them
if row["state_group"] not in state_groups_to_delete
logger.info("[purge] finding state groups that can be deleted")

state_groups_to_delete, remaining_state_groups = (
self._find_unreferenced_groups_during_purge(
txn, referenced_state_groups,
)
)

logger.info(
"[purge] found %i state groups to delete",
len(state_groups_to_delete),
)

logger.info(
"[purge] de-delta-ing %i remaining state groups",
len(remaining_state_groups),
)

# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
Expand Down Expand Up @@ -2127,11 +2111,11 @@ def _purge_history_txn(
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
state_rows
((sg,) for sg in state_groups_to_delete),
)

logger.info("[purge] removing events from event_to_state_groups")
Expand Down Expand Up @@ -2227,6 +2211,85 @@ def _purge_history_txn(

logger.info("[purge] done")

def _find_unreferenced_groups_during_purge(self, txn, state_groups):
"""Used when purging history to figure out which state groups can be
deleted and which need to be de-delta'ed (due to one of its prev groups
being scheduled for deletion).

Args:
txn
state_groups (set[int]): Set of state groups referenced by events
that are going to be deleted.

Returns:
tuple[set[int], set[int]]: The set of state groups that can be
deleted and the set of state groups that need to be de-delta'ed
"""
# Graph of state group -> previous group
graph = {}

# Set of events that we have found to be referenced by events
referenced_groups = set()

# Set of state groups we've already seen
state_groups_seen = set(state_groups)

# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search

# Check if state groups are referenced
sql = """
SELECT DISTINCT state_group FROM event_to_state_groups
LEFT JOIN events_to_purge AS ep USING (event_id)
WHERE state_group IN (%s) AND ep.event_id IS NULL
""" % (",".join("?" for _ in current_search),)
txn.execute(sql, list(current_search))

referenced = set(sg for sg, in txn)
referenced_groups |= referenced

# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced

rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=current_search,
keyvalues={},
retcols=("prev_state_group", "state_group",),
)

prevs = set(row["state_group"] for row in rows)
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs

for row in rows:
# Note: Each state group can have at most one prev group
graph[row["state_group"]] = row["prev_state_group"]

to_delete = state_groups_seen - referenced_groups

to_dedelta = set()
for sg in referenced_groups:
prev_sg = graph.get(sg)
if prev_sg and prev_sg in to_delete:
to_dedelta.add(sg)

return to_delete, to_dedelta

@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 51
SCHEMA_VERSION = 52

dir_path = os.path.abspath(os.path.dirname(__file__))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- This is needed to efficiently check for unreferenced state groups during
-- purge. Added events_to_state_group(state_group) index
INSERT into background_updates (update_name, progress_json)
VALUES ('event_to_state_groups_sg_index', '{}');
7 changes: 7 additions & 0 deletions synapse/storage/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"

def __init__(self, db_conn, hs):
super(StateStore, self).__init__(db_conn, hs)
Expand All @@ -1275,6 +1276,12 @@ def __init__(self, db_conn, hs):
columns=["state_key"],
where_clause="type='m.room.member'",
)
self.register_background_index_update(
self.EVENT_STATE_GROUP_INDEX_UPDATE_NAME,
index_name="event_to_state_groups_sg_index",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth remembering that we already have this on matrix.org, so would ideally not spend ages building another copy of the index.

table="event_to_state_groups",
columns=["state_group"],
)

def _store_event_state_mappings_txn(self, txn, events_and_contexts):
state_groups = {}
Expand Down