Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve searching in the userdir #4846

Merged
merged 16 commits into from
Mar 14, 2019
1 change: 1 addition & 0 deletions changelog.d/4846.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The user directory has been rewritten to make it faster, with less chance of falling behind on a large server.
1 change: 1 addition & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ def setup(config_options):
logger.info("Database prepared in %s.", config.database_config['name'])

hs.setup()
hs.setup_master()

@defer.inlineCallbacks
def do_acme():
Expand Down
8 changes: 7 additions & 1 deletion synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ def __init__(self, hs):
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users

# If we're a worker, don't sleep when doing the initial room work, as it
# won't monopolise the master's CPU.
if hs.config.worker_app:
self.INITIAL_ROOM_SLEEP_MS = 0
self.INITIAL_USER_SLEEP_MS = 0

# When start up for the first time we need to populate the user_directory.
# This is a set of user_id's we've inserted already
self.initially_handled_users = set()
Expand Down Expand Up @@ -231,7 +237,7 @@ def _handle_initial_room(self, room_id):
unhandled_users = user_ids - self.initially_handled_users

yield self.store.add_profiles_to_user_dir(
{user_id: users_with_profile[user_id] for user_id in unhandled_users},
{user_id: users_with_profile[user_id] for user_id in unhandled_users}
)

self.initially_handled_users |= unhandled_users
Expand Down
8 changes: 8 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ def build_DEPENDENCY(self)
'registration_handler',
]

REQUIRED_ON_MASTER_STARTUP = [
"user_directory_handler",
]

# This is overridden in derived application classes
# (such as synapse.app.homeserver.SynapseHomeServer) and gives the class to be
# instantiated during setup() for future return by get_datastore()
Expand Down Expand Up @@ -221,6 +225,10 @@ def setup(self):
conn.commit()
logger.info("Finished setting up.")

def setup_master(self):
for i in self.REQUIRED_ON_MASTER_STARTUP:
getattr(self, "get_" + i)()
hawkowl marked this conversation as resolved.
Show resolved Hide resolved

def get_reactor(self):
"""
Fetch the Twisted reactor in use by this HomeServer.
Expand Down
13 changes: 10 additions & 3 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,18 +767,25 @@ def _simple_upsert_txn_native_upsert(
"""
allvalues = {}
allvalues.update(keyvalues)
allvalues.update(values)
allvalues.update(insertion_values)

if not values:
latter = "NOTHING"
else:
allvalues.update(values)
latter = (
"UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
)

sql = (
"INSERT INTO %s (%s) VALUES (%s) "
"ON CONFLICT (%s) DO UPDATE SET %s"
"ON CONFLICT (%s) DO %s"
) % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues),
", ".join(k for k in keyvalues),
", ".join(k + "=EXCLUDED." + k for k in values),
latter
)
txn.execute(sql, list(allvalues.values()))

Expand Down
3 changes: 0 additions & 3 deletions synapse/storage/schema/delta/53/user_share.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
-- Old disused version of the tables below.
DROP TABLE IF EXISTS users_who_share_rooms;

-- This is no longer used because it's duplicated by the users_who_share_public_rooms
DROP TABLE IF EXISTS users_in_public_rooms;

-- Tables keeping track of what users share rooms. This is a map of local users
-- to local or remote users, per room. Remote users cannot be in the user_id
-- column, only the other_user_id column. There are two tables, one for public
Expand Down
28 changes: 28 additions & 0 deletions synapse/storage/schema/delta/53/users_in_public_rooms.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- We don't need the old version of this table.
DROP TABLE IF EXISTS users_in_public_rooms;

-- Track what users are in public rooms.
CREATE TABLE IF NOT EXISTS users_in_public_rooms (
user_id TEXT NOT NULL
);
hawkowl marked this conversation as resolved.
Show resolved Hide resolved

CREATE UNIQUE INDEX users_in_public_rooms_u_idx ON users_in_public_rooms(user_id);

-- Fill the table.
INSERT INTO background_updates (update_name, progress_json) VALUES
('users_in_public_rooms_initial', '{}');
86 changes: 79 additions & 7 deletions synapse/storage/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,57 @@
from twisted.internet import defer

from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.state import StateFilter
from synapse.types import get_domain_from_id, get_localpart_from_id
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks

from ._base import SQLBaseStore

logger = logging.getLogger(__name__)


class UserDirectoryStore(SQLBaseStore):
class UserDirectoryStore(BackgroundUpdateStore):
def __init__(self, dbconn, hs):
super(UserDirectoryStore, self).__init__(dbconn, hs)

self.register_background_update_handler(
"users_in_public_rooms_initial", self._populate_users_in_public_rooms
)
hawkowl marked this conversation as resolved.
Show resolved Hide resolved

@defer.inlineCallbacks
def _populate_users_in_public_rooms(self, progress, batch_size):
"""
Populate the users_in_public_rooms table with the contents of the
users_who_share_public_rooms table.
"""

def _fetch(txn):
sql = "SELECT DISTINCT other_user_id FROM users_who_share_public_rooms"
txn.execute(sql)
return txn.fetchall()

users = yield self.runInteraction(
"populate_users_in_public_rooms_fetch", _fetch
)

if users:
def _fill(txn):
self._simple_upsert_many_txn(
txn,
table="users_in_public_rooms",
key_names=["user_id"],
key_values=users,
value_names=(),
value_values=None,
)

users = yield self.runInteraction(
"populate_users_in_public_rooms_fill", _fill
)

yield self._end_background_update("users_in_public_rooms_initial")
defer.returnValue(1)

@defer.inlineCallbacks
def is_room_world_readable_or_publicly_joinable(self, room_id):
"""Check if the room is either world_readable or publically joinable
Expand Down Expand Up @@ -241,6 +281,9 @@ def _remove_from_user_dir_txn(txn):
self._simple_delete_txn(
txn, table="user_directory_search", keyvalues={"user_id": user_id}
)
self._simple_delete_txn(
txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
)
self._simple_delete_txn(
txn,
table="users_who_share_public_rooms",
Expand Down Expand Up @@ -339,6 +382,20 @@ def _add_users_who_share_room_txn(txn):
value_names=(),
value_values=None,
)

# If it's a public room, also update them in users_in_public_rooms.
# We don't look before they're in the table before we do it, as it's
# more efficient to simply have Postgres do that (one UPSERT vs one
# SELECT and maybe one INSERT).
if not share_private:
for user_id in set([x[1] for x in user_id_tuples]):
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
self._simple_upsert_txn(
txn,
"users_in_public_rooms",
keyvalues={"user_id": user_id},
values={},
)

for user_id, other_user_id in user_id_tuples:
txn.call_after(
self.get_users_who_share_room_from_dir.invalidate, (user_id,)
Expand Down Expand Up @@ -379,6 +436,21 @@ def _remove_user_who_share_room_txn(txn):
table="users_who_share_public_rooms",
keyvalues={"other_user_id": user_id, "room_id": room_id},
)

# Are the users still in a public room after we deleted them from this one?
still_in_public = self._simple_select_one_onecol_txn(
txn,
"users_who_share_public_rooms",
keyvalues={"other_user_id": user_id},
retcol="other_user_id",
allow_none=True,
)

if still_in_public is None:
self._simple_delete_txn(
txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
)

txn.call_after(
self.get_users_who_share_room_from_dir.invalidate, (user_id,)
)
Expand Down Expand Up @@ -452,6 +524,7 @@ def delete_all_from_user_dir(self):
def _delete_all_from_user_dir_txn(txn):
txn.execute("DELETE FROM user_directory")
txn.execute("DELETE FROM user_directory_search")
txn.execute("DELETE FROM users_in_public_rooms")
txn.execute("DELETE FROM users_who_share_public_rooms")
txn.execute("DELETE FROM users_who_share_private_rooms")
txn.call_after(self.get_user_in_directory.invalidate_all)
Expand Down Expand Up @@ -568,15 +641,14 @@ def search_user_dir(self, user_id, search_term, limit):
where_clause = "1=1"
else:
join_clause = """
LEFT JOIN users_in_public_rooms AS p USING (user_id)
LEFT JOIN (
SELECT other_user_id AS user_id FROM users_who_share_public_rooms
UNION
SELECT other_user_id AS user_id FROM users_who_share_private_rooms
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
WHERE user_id = ?
) AS p USING (user_id)
) AS s USING (user_id)
"""
join_args = (user_id,)
where_clause = "p.user_id IS NOT NULL"
where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)"

if isinstance(self.database_engine, PostgresEngine):
full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
Expand Down
20 changes: 20 additions & 0 deletions tests/handlers/test_user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,13 @@ def test_private_room(self):
# Check we have populated the database correctly.
shares_public = self.get_users_who_share_public_rooms()
shares_private = self.get_users_who_share_private_rooms()
public_users = self.get_users_in_public_rooms()

self.assertEqual(shares_public, [])
self.assertEqual(
self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)])
)
self.assertEqual(public_users, [])

# We get one search result when searching for user2 by user1.
s = self.get_success(self.handler.search_users(u1, "user2", 10))
Expand All @@ -140,9 +142,11 @@ def test_private_room(self):
# Check we have removed the values.
shares_public = self.get_users_who_share_public_rooms()
shares_private = self.get_users_who_share_private_rooms()
public_users = self.get_users_in_public_rooms()

self.assertEqual(shares_public, [])
self.assertEqual(self._compress_shared(shares_private), set())
self.assertEqual(public_users, [])

# User1 now gets no search results for any of the other users.
s = self.get_success(self.handler.search_users(u1, "user2", 10))
Expand All @@ -160,6 +164,15 @@ def _compress_shared(self, shared):
r.add((i["user_id"], i["other_user_id"], i["room_id"]))
return r

def get_users_in_public_rooms(self):
return self.get_success(
self.store._simple_select_onecol(
"users_in_public_rooms",
None,
"user_id",
)
)

def get_users_who_share_public_rooms(self):
return self.get_success(
self.store._simple_select_list(
Expand Down Expand Up @@ -202,9 +215,12 @@ def test_initial(self):

shares_public = self.get_users_who_share_public_rooms()
shares_private = self.get_users_who_share_private_rooms()
public_users = self.get_users_in_public_rooms()

# Nothing updated yet
self.assertEqual(shares_private, [])
self.assertEqual(shares_public, [])
self.assertEqual(public_users, [])

# Reset the handled users caches
self.handler.initially_handled_users = set()
Expand All @@ -221,6 +237,7 @@ def test_initial(self):

shares_public = self.get_users_who_share_public_rooms()
shares_private = self.get_users_who_share_private_rooms()
public_users = self.get_users_in_public_rooms()

# User 1 and User 2 share public rooms
self.assertEqual(
Expand All @@ -233,6 +250,9 @@ def test_initial(self):
set([(u1, u3, private_room), (u3, u1, private_room)]),
)

# User 1 and 2 are in public rooms
self.assertEqual(set(public_users), set([u1, u2]))

def test_search_all_users(self):
"""
Search all users = True means that a user does not have to share a
Expand Down
2 changes: 2 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ def cleanup():
cleanup_func(cleanup)

hs.setup()
if homeserverToUse.__name__ == "TestHomeServer":
hs.setup_master()
else:
hs = homeserverToUse(
name,
Expand Down