Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use native UPSERTs where possible #4306

Merged
merged 49 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
03f2f75
add pg9.5+ upserts
hawkowl Dec 18, 2018
b0c720e
add sqlite 3.24+ upserts
hawkowl Dec 18, 2018
092034a
changelog
hawkowl Dec 18, 2018
82bd14e
refactor feature checking into the engine
hawkowl Dec 18, 2018
43eef60
fixup pep8
hawkowl Dec 18, 2018
e1dd7fe
fixup sqlite3
hawkowl Dec 18, 2018
5c5061e
don't shadow a global import name
hawkowl Dec 18, 2018
5198352
fix up gitignore
hawkowl Dec 18, 2018
6e2e7ff
black
hawkowl Dec 18, 2018
3ac5bcc
try this?
hawkowl Dec 18, 2018
98b00e6
try this?
hawkowl Dec 18, 2018
1e0008b
try this?
hawkowl Dec 18, 2018
faae2e1
test
hawkowl Dec 21, 2018
fd12570
fix
hawkowl Dec 21, 2018
7200d0a
fix for mau?
hawkowl Dec 21, 2018
0e54122
Merge branch 'develop' into hawkowl/upsert-e2e
hawkowl Dec 21, 2018
cb7fe83
Merge remote-tracking branch 'origin/hawkowl/upsert-e2e' into hawkowl…
hawkowl Dec 21, 2018
76bc34a
small fix
hawkowl Dec 21, 2018
f3a0ab8
fix the client ips batch
hawkowl Dec 27, 2018
1b28793
add method to block upserts until we have built indexes
hawkowl Dec 27, 2018
c97601f
redo best effort
hawkowl Dec 27, 2018
02a52c9
cleanups
hawkowl Dec 27, 2018
54a5db3
Merge remote-tracking branch 'origin/develop' into hawkowl/upsert-e2e
hawkowl Dec 27, 2018
6272d0c
respect blocked native upsert
hawkowl Dec 27, 2018
6233abf
respect blocked native upsert
hawkowl Dec 27, 2018
a912b6d
move this so that slaves can safely upsert too
hawkowl Dec 27, 2018
48c351e
fix failing tests
hawkowl Dec 27, 2018
c79058f
fix lint
hawkowl Dec 27, 2018
ee4a0d8
fix failing tests
hawkowl Dec 27, 2018
7df76f7
Update 4306.misc
hawkowl Dec 28, 2018
7a16615
Merge remote-tracking branch 'origin/develop' into hawkowl/upsert-e2e
hawkowl Dec 31, 2018
cd26a90
add some full schemas + docs
hawkowl Jan 3, 2019
a08154a
add the inserts :/
hawkowl Jan 3, 2019
9ead351
clean up packaging
hawkowl Jan 3, 2019
1c9ef51
cleanups
hawkowl Jan 3, 2019
4e66437
cleanups
hawkowl Jan 3, 2019
aebb75a
merge
hawkowl Jan 18, 2019
b6d7fed
things we don't need
hawkowl Jan 18, 2019
238b8d4
things we don't need
hawkowl Jan 18, 2019
078535a
things we don't need
hawkowl Jan 18, 2019
f92a8b3
Merge remote-tracking branch 'origin/develop' into hawkowl/upsert-e2e
hawkowl Jan 18, 2019
9392c55
revert
hawkowl Jan 18, 2019
abf5bd3
Merge remote-tracking branch 'origin/develop' into hawkowl/upsert-e2e
hawkowl Jan 23, 2019
ee92e1a
remove uses where we care about what an upsert returns
hawkowl Jan 23, 2019
b7c3511
Update MANIFEST.in
hawkowl Jan 23, 2019
d53a60b
Update 4306.misc
hawkowl Jan 23, 2019
1b1d782
update due to review
hawkowl Jan 24, 2019
c542f8c
Merge remote-tracking branch 'origin/develop' into hawkowl/upsert-e2e
hawkowl Jan 24, 2019
e2da664
Merge branch 'hawkowl/upsert-e2e' of ssh://github.com/matrix-org/syna…
hawkowl Jan 24, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
[run]
branch = True
parallel = True
source = synapse

[paths]
source=
coverage
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
include = synapse/*

[report]
precision = 2
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ homeserver*.pid
*.tls.dh
*.tls.key

.coverage
.coverage.*
!.coverage.rc
.coverage*
coverage.*
!.coveragerc
htmlcov

demo/*/*.db
Expand Down
1 change: 1 addition & 0 deletions changelog.d/4306.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Synapse will now take advantage of native UPSERT functionality in PostgreSQL 9.5+.
163 changes: 153 additions & 10 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,58 @@ def __init__(self, db_conn, hs):

self.database_engine = hs.database_engine

# We force simple upserts by default, and then enable them once we have
# finished background updates (checked by `_check_safe_to_upsert`).
self._force_simple_upsert = True

if getattr(hs.config, "_enable_native_upserts", True):
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates.
self._clock.call_later(0.0, self._check_safe_to_upsert)

@defer.inlineCallbacks
def _check_safe_to_upsert(self):
"""
Is it safe to use native UPSERT?

If there are background updates, we will need to wait, as they may be
the addition of indexes that set the UNIQUE constraint that we require.

If the background updates have not completed, wait a second and check again.
"""
completed = yield self.has_completed_background_updates()

if completed:
# Now that indexes are built, we are allowed to do native UPSERTs if
# the underlying database supports it.
self._force_simple_upsert = False
else:
self._clock.call_later(1.0, self._check_safe_to_upsert)

@defer.inlineCallbacks
def has_completed_background_updates(self):
"""
Check if all the background updates have completed. This is safe to run
on the master as well as slaves, and will be overridden by
background_updates.BackgroundUpdateStore if we are the master and have a
subclass with it.

Returns: Deferred[bool]: True if all background updates have completed
"""
# otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates
# itself, but still wants to wait for them to happen.
updates = yield self._simple_select_onecol(
"background_updates",
keyvalues=None,
retcol="1",
desc="check_background_updates",
)
if not updates:
defer.returnValue(True)

defer.returnValue(False)

def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()

Expand Down Expand Up @@ -494,8 +546,16 @@ def _simple_insert_many_txn(txn, table, values):
txn.executemany(sql, vals)

@defer.inlineCallbacks
def _simple_upsert(self, table, keyvalues, values,
insertion_values={}, desc="_simple_upsert", lock=True):
def _simple_upsert(
self,
table,
keyvalues,
values,
insertion_values={},
desc="_simple_upsert",
lock=True,
best_effort=False,
):
"""

`lock` should generally be set to True (the default), but can be set
Expand All @@ -515,6 +575,7 @@ def _simple_upsert(self, table, keyvalues, values,
insertion_values (dict): additional key/values to use only when
inserting
lock (bool): True to lock the table when doing the upsert.
best_effort (bool): If we run into a transaction error, do we stop trying?
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
Returns:
Deferred(bool): True if a new entry was created, False if an
existing one was updated.
Expand All @@ -524,8 +585,13 @@ def _simple_upsert(self, table, keyvalues, values,
try:
result = yield self.runInteraction(
desc,
self._simple_upsert_txn, table, keyvalues, values, insertion_values,
lock=lock
self._simple_upsert_txn,
table,
keyvalues,
values,
insertion_values,
lock=lock,
best_effort=best_effort,
)
defer.returnValue(result)
except self.database_engine.module.IntegrityError as e:
Expand All @@ -537,12 +603,45 @@ def _simple_upsert(self, table, keyvalues, values,

# presumably we raced with another transaction: let's retry.
logger.warn(
"IntegrityError when upserting into %s; retrying: %s",
table, e
"%s when upserting into %s; retrying: %s", e.__name__, table, e
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
)

def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
lock=True):
def _simple_upsert_txn(
self,
txn,
table,
keyvalues,
values,
insertion_values={},
lock=True,
best_effort=False,
):
"""
Pick the UPSERT method which works best on the platform. Either the
native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
"""
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
if self.database_engine.can_native_upsert and not self._force_simple_upsert:
return self._simple_upsert_txn_native_upsert(
txn,
table,
keyvalues,
values,
insertion_values=insertion_values,
best_effort=best_effort,
)
else:
return self._simple_upsert_txn_emulated(
txn,
table,
keyvalues,
values,
insertion_values=insertion_values,
lock=lock,
)

def _simple_upsert_txn_emulated(
self, txn, table, keyvalues, values, insertion_values={}, lock=True
):
# We need to lock the table :(, unless we're *really* careful
if lock:
self.database_engine.lock_table(txn, table)
Expand All @@ -551,7 +650,7 @@ def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
sql = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join("%s = ?" % (k,) for k in values),
" AND ".join("%s = ?" % (k,) for k in keyvalues)
" AND ".join("%s = ?" % (k,) for k in keyvalues),
)
sqlargs = list(values.values()) + list(keyvalues.values())

Expand All @@ -569,12 +668,56 @@ def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues)
", ".join("?" for _ in allvalues),
)
txn.execute(sql, list(allvalues.values()))
# successfully inserted
return True

def _simple_upsert_txn_native_upsert(
self, txn, table, keyvalues, values, insertion_values={}, best_effort=False
):
"""
Use the native UPSERT functionality in recent PostgreSQL versions.
"""
allvalues = {}
allvalues.update(keyvalues)
allvalues.update(values)
allvalues.update(insertion_values)

sql = (
"INSERT INTO %s (%s) VALUES (%s) "
"ON CONFLICT (%s) DO UPDATE SET %s "
"RETURNING (xmax = 0) AS inserted"
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
) % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues),
", ".join(k for k in keyvalues),
", ".join(k + "=EXCLUDED." + k for k in values),
)
try:
txn.execute(sql, list(allvalues.values()))
except self.database_engine.module.OperationalError as e:
# We only care about serialization errors, so check for it
if e.args[0] == "could not serialize access due to concurrent update":
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
# A concurrent update problem is when we try and do a native
# UPSERT but the row has changed from under us. We can either
# retry, or give up if asked to do so.
if best_effort:
# If it's a concurrent-update problem, and this is marked as
# 'best effort' (i.e. if there's a race, then the one we
# raced with will suffice), then pretend that we succeeded.
return False
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
else:
# Otherwise, raise, because it's a real OperationalError and we
# will need to be rolled back and retried.
raise

# One-tuple, which is a boolean for insertion or not
res = txn.fetchone()
return res[0]

def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to
Expand Down
12 changes: 11 additions & 1 deletion synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ def __init__(self, db_conn, hs):
columns=["last_seen"],
)

# Register a unique index
self.register_background_index_update(
"user_ips_device_unique_index",
index_name="user_ips_device_unique_id",
table="user_ips",
columns=["user_id", "access_token", "ip", "user_agent", "device_id"],
unique=True,
)

# (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

Expand Down Expand Up @@ -114,7 +123,8 @@ def update():
)

def _update_client_ips_batch_txn(self, txn, to_update):
self.database_engine.lock_table(txn, "user_ips")
if self._force_simple_upsert or (not self.database_engine.can_native_upsert):
self.database_engine.lock_table(txn, "user_ips")

for entry in iteritems(to_update):
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from ._base import IncorrectDatabaseSetup
from .postgres import PostgresEngine
from .sqlite3 import Sqlite3Engine
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
from .sqlite import Sqlite3Engine

SUPPORTED_MODULE = {
"sqlite3": Sqlite3Engine,
Expand Down
19 changes: 19 additions & 0 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ def convert_param_style(self, sql):
return sql.replace("?", "%s")

def on_new_connection(self, db_conn):

# Get the version of PostgreSQL that we're using. As per the psycopg2
# docs: The number is formed by converting the major, minor, and
# revision numbers into two-decimal-digit numbers and appending them
# together. For example, version 8.1.5 will be returned as 80105.
server_version = str(db_conn.server_version)
self._version = (
int(server_version[:-4], 10),
int(server_version[-4:-2], 10),
int(server_version[-2:], 10)
)
hawkowl marked this conversation as resolved.
Show resolved Hide resolved

db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
Expand All @@ -54,6 +66,13 @@ def on_new_connection(self, db_conn):

cursor.close()

@property
def can_native_upsert(self):
"""
Can we use native UPSERTs? This requires PostgreSQL 9.5+.
"""
return self._version >= (9, 5, 0)

def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ def __init__(self, database_module, database_config):
self._current_state_group_id = None
self._current_state_group_id_lock = threading.Lock()

@property
def can_native_upsert(self):
"""
Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
more work we haven't done yet to tell what was inserted vs updated.
"""
# SQLite currently doesn't have a way of telling if an UPSERT ended up
# with an INSERT or an UPDATE
# To enable, uncomment:
#
# from sqlite3 import sqlite_version_info
# return sqlite_version_info >= (3, 24, 0)

return False
hawkowl marked this conversation as resolved.
Show resolved Hide resolved

def check_database(self, txn):
pass

Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/monthly_active_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ def upsert_monthly_active_user_txn(self, txn, user_id):
# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more
# See https://github.com/matrix-org/synapse/issues/3854 for more.
# If we support native upserts, we'll not lock, but also not retry
# on any races, by setting best_effort=True.
is_insert = self._simple_upsert_txn(
txn,
table="monthly_active_users",
Expand All @@ -247,6 +249,7 @@ def upsert_monthly_active_user_txn(self, txn, user_id):
values={
"timestamp": int(self._clock.time_msec()),
},
best_effort=True,
)

return is_insert
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 53
SCHEMA_VERSION = 54

dir_path = os.path.abspath(os.path.dirname(__file__))

Expand Down
18 changes: 18 additions & 0 deletions synapse/storage/schema/delta/54/user_ips_index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- add a new unique index to user_ips table
INSERT INTO background_updates (update_name, progress_json) VALUES
('user_ips_device_unique_index', '{}');
Loading