Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Rebuild other indexes using stream_ordering #10282

Merged
merged 1 commit into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10282.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug where Synapse would return errors after 2<sup>31</sup> events were handled by the server.
50 changes: 47 additions & 3 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@
logger = logging.getLogger(__name__)


_REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
_REPLACE_STREAM_ORDERING_SQL_COMMANDS = (
# there should be no leftover rows without a stream_ordering2, but just in case...
"UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL",
# finally, we can drop the rule and switch the columns
# now we can drop the rule and switch the columns
"DROP RULE populate_stream_ordering2 ON events",
"ALTER TABLE events DROP COLUMN stream_ordering",
"ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering",
# ... and finally, rename the indexes into place for consistency with sqlite
"ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index",
"ALTER INDEX events_order_room2 RENAME TO events_order_room",
"ALTER INDEX events_room_stream2 RENAME TO events_room_stream",
"ALTER INDEX events_ts2 RENAME TO events_ts",
)


Expand All @@ -45,6 +50,10 @@ class _BackgroundUpdates:
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"


Expand Down Expand Up @@ -155,24 +164,59 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._purged_chain_cover_index,
)

################################################################################

# bg updates for replacing stream_ordering with a BIGINT
# (these only run on postgres.)

self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
self._background_populate_stream_ordering2,
)
# CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2);
self.db_pool.updates.register_background_index_update(
_BackgroundUpdates.INDEX_STREAM_ORDERING2,
index_name="events_stream_ordering",
table="events",
columns=["stream_ordering2"],
unique=True,
)
# CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false;
self.db_pool.updates.register_background_index_update(
_BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL,
index_name="event_contains_url_index2",
table="events",
columns=["room_id", "topological_ordering", "stream_ordering2"],
where_clause="contains_url = true AND outlier = false",
)
# CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering);
self.db_pool.updates.register_background_index_update(
_BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER,
index_name="events_order_room2",
table="events",
columns=["room_id", "topological_ordering", "stream_ordering2"],
)
# CREATE INDEX events_room_stream ON events(room_id, stream_ordering);
self.db_pool.updates.register_background_index_update(
_BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM,
index_name="events_room_stream2",
table="events",
columns=["room_id", "stream_ordering2"],
)
# CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);
self.db_pool.updates.register_background_index_update(
_BackgroundUpdates.INDEX_STREAM_ORDERING2_TS,
index_name="events_ts2",
table="events",
columns=["origin_server_ts", "stream_ordering2"],
)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN,
self._background_replace_stream_ordering_column,
)

################################################################################

async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
Expand Down Expand Up @@ -1098,7 +1142,7 @@ async def _background_replace_stream_ordering_column(
"""Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""

def process(txn: Cursor) -> None:
for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS:
for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS:
logger.info("completing stream_ordering migration: %s", sql)
txn.execute(sql)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@ CREATE OR REPLACE RULE "populate_stream_ordering2" AS
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6001, 'populate_stream_ordering2', '{}');

-- ... and another to build an index on it
-- ... and some more to build indexes on it. These aren't really interdependent
-- but the backround_updates manager can only handle a single dependency per update.
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2');
(6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'),
(6001, 'index_stream_ordering2_room_order', '{}', 'index_stream_ordering2'),
(6001, 'index_stream_ordering2_contains_url', '{}', 'index_stream_ordering2_room_order'),
(6001, 'index_stream_ordering2_room_stream', '{}', 'index_stream_ordering2_contains_url'),
(6001, 'index_stream_ordering2_ts', '{}', 'index_stream_ordering2_room_stream');

-- ... and another to do the switcheroo
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2');
(6003, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2_ts');