-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix MultiWriteIdGenerator's handling of restarts. #8374
Changes from all commits
81dac92
f5a91ab
c08456d
6e3d562
d6f5c72
e83800f
d86f62e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix theoretical race condition where events are not sent down `/sync` if the synchrotron worker is restarted without restarting other workers. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/* Copyright 2020 The Matrix.org Foundation C.I.C | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
CREATE TABLE stream_positions ( | ||
stream_name TEXT NOT NULL, | ||
instance_name TEXT NOT NULL, | ||
stream_id BIGINT NOT NULL | ||
); | ||
|
||
CREATE UNIQUE INDEX stream_positions_idx ON stream_positions(stream_name, instance_name); |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import attr | ||
from typing_extensions import Deque | ||
|
||
from synapse.metrics.background_process_metrics import run_as_background_process | ||
from synapse.storage.database import DatabasePool, LoggingTransaction | ||
from synapse.storage.util.sequence import PostgresSequenceGenerator | ||
|
||
|
@@ -184,12 +185,16 @@ class MultiWriterIdGenerator: | |
Args: | ||
db_conn | ||
db | ||
stream_name: A name for the stream. | ||
instance_name: The name of this instance. | ||
table: Database table associated with stream. | ||
instance_column: Column that stores the row's writer's instance name | ||
id_column: Column that stores the stream ID. | ||
sequence_name: The name of the postgres sequence used to generate new | ||
IDs. | ||
writers: A list of known writers to use to populate current positions | ||
on startup. Can be empty if nothing uses `get_current_token` or | ||
`get_positions` (e.g. caches stream). | ||
positive: Whether the IDs are positive (true) or negative (false). | ||
When using negative IDs we go backwards from -1 to -2, -3, etc. | ||
""" | ||
|
@@ -198,16 +203,20 @@ def __init__( | |
self, | ||
db_conn, | ||
db: DatabasePool, | ||
stream_name: str, | ||
instance_name: str, | ||
table: str, | ||
instance_column: str, | ||
id_column: str, | ||
sequence_name: str, | ||
writers: List[str], | ||
positive: bool = True, | ||
): | ||
self._db = db | ||
self._stream_name = stream_name | ||
self._instance_name = instance_name | ||
self._positive = positive | ||
self._writers = writers | ||
self._return_factor = 1 if positive else -1 | ||
|
||
# We lock as some functions may be called from DB threads. | ||
|
@@ -216,9 +225,7 @@ def __init__( | |
# Note: If we are a negative stream then we still store all the IDs as | ||
# positive to make life easier for us, and simply negate the IDs when we | ||
# return them. | ||
self._current_positions = self._load_current_ids( | ||
db_conn, table, instance_column, id_column | ||
) | ||
self._current_positions = {} # type: Dict[str, int] | ||
|
||
# Set of local IDs that we're still processing. The current position | ||
# should be less than the minimum of this set (if not empty). | ||
|
@@ -251,30 +258,80 @@ def __init__( | |
|
||
self._sequence_gen = PostgresSequenceGenerator(sequence_name) | ||
|
||
# This goes and fills out the above state from the database. | ||
self._load_current_ids(db_conn, table, instance_column, id_column) | ||
|
||
def _load_current_ids( | ||
self, db_conn, table: str, instance_column: str, id_column: str | ||
) -> Dict[str, int]: | ||
# If positive stream aggregate via MAX. For negative stream use MIN | ||
# *and* negate the result to get a positive number. | ||
sql = """ | ||
SELECT %(instance)s, %(agg)s(%(id)s) FROM %(table)s | ||
GROUP BY %(instance)s | ||
""" % { | ||
"instance": instance_column, | ||
"id": id_column, | ||
"table": table, | ||
"agg": "MAX" if self._positive else "-MIN", | ||
} | ||
|
||
): | ||
cur = db_conn.cursor() | ||
cur.execute(sql) | ||
|
||
# `cur` is an iterable over returned rows, which are 2-tuples. | ||
current_positions = dict(cur) | ||
# Load the current positions of all writers for the stream. | ||
if self._writers: | ||
sql = """ | ||
SELECT instance_name, stream_id FROM stream_positions | ||
WHERE stream_name = ? | ||
""" | ||
sql = self._db.engine.convert_param_style(sql) | ||
|
||
cur.close() | ||
cur.execute(sql, (self._stream_name,)) | ||
|
||
self._current_positions = { | ||
instance: stream_id * self._return_factor | ||
for instance, stream_id in cur | ||
if instance in self._writers | ||
} | ||
|
||
# We set the `_persisted_upto_position` to be the minimum of all current | ||
# positions. If empty we use the max stream ID from the DB table. | ||
min_stream_id = min(self._current_positions.values(), default=None) | ||
|
||
if min_stream_id is None: | ||
sql = """ | ||
SELECT COALESCE(%(agg)s(%(id)s), 1) FROM %(table)s | ||
""" % { | ||
"id": id_column, | ||
"table": table, | ||
"agg": "MAX" if self._positive else "-MIN", | ||
} | ||
cur.execute(sql) | ||
(stream_id,) = cur.fetchone() | ||
self._persisted_upto_position = stream_id | ||
else: | ||
# If we have a min_stream_id then we pull out everything greater | ||
# than it from the DB so that we can prefill | ||
# `_known_persisted_positions` and get a more accurate | ||
# `_persisted_upto_position`. | ||
# | ||
# We also check if any of the later rows are from this instance, in | ||
# which case we use that for this instance's current position. This | ||
# is to handle the case where we didn't finish persisting to the | ||
# stream positions table before restart (or the stream position | ||
# table otherwise got out of date). | ||
Comment on lines
+306
to
+310
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this true of any instance in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We know that if the current instance has just been restarted then we don't have any rows that are currently being persisted, so its safe to set the current position of itself to the max stream ID. Other instances may not have been restarted so may still be persisting things. (We don't just set the current position of the instance to the max stream ID as in future we want every entry in |
||
|
||
sql = """ | ||
SELECT %(instance)s, %(id)s FROM %(table)s | ||
WHERE ? %(cmp)s %(id)s | ||
""" % { | ||
"id": id_column, | ||
"table": table, | ||
"instance": instance_column, | ||
"cmp": "<=" if self._positive else ">=", | ||
} | ||
sql = self._db.engine.convert_param_style(sql) | ||
cur.execute(sql, (min_stream_id,)) | ||
|
||
self._persisted_upto_position = min_stream_id | ||
|
||
with self._lock: | ||
for (instance, stream_id,) in cur: | ||
stream_id = self._return_factor * stream_id | ||
self._add_persisted_position(stream_id) | ||
|
||
return current_positions | ||
if instance == self._instance_name: | ||
self._current_positions[instance] = stream_id | ||
|
||
cur.close() | ||
|
||
def _load_next_id_txn(self, txn) -> int: | ||
return self._sequence_gen.get_next_id_txn(txn) | ||
|
@@ -316,6 +373,21 @@ def get_next_txn(self, txn: LoggingTransaction): | |
txn.call_after(self._mark_id_as_finished, next_id) | ||
txn.call_on_exception(self._mark_id_as_finished, next_id) | ||
|
||
# Update the `stream_positions` table with newly updated stream | ||
# ID (unless self._writers is not set in which case we don't | ||
# bother, as nothing will read it). | ||
# | ||
# We only do this on the success path so that the persisted current | ||
# position points to a persited row with the correct instance name. | ||
if self._writers: | ||
txn.call_after( | ||
run_as_background_process, | ||
"MultiWriterIdGenerator._update_table", | ||
self._db.runInteraction, | ||
"MultiWriterIdGenerator._update_table", | ||
self._update_stream_positions_table_txn, | ||
) | ||
Comment on lines
+383
to
+389
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any issues with this being a different transaction? (What if this transaction fails?) I suspect it is OK since we always update to the largest position anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want to do it after we've marked the associated ID as "persisted", which happens after the transaction has already finished. Though a follow up question of: can we mark the ID as persisted before we finish the transaction? The answer to that is probably yes if we faff around a bit. One thing to note is that the only stream that actually uses |
||
|
||
return self._return_factor * next_id | ||
|
||
def _mark_id_as_finished(self, next_id: int): | ||
|
@@ -447,6 +519,28 @@ def _add_persisted_position(self, new_id: int): | |
# do. | ||
break | ||
|
||
def _update_stream_positions_table_txn(self, txn): | ||
"""Update the `stream_positions` table with newly persisted position. | ||
""" | ||
|
||
if not self._writers: | ||
return | ||
Comment on lines
+526
to
+527
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't really matter, but this is checked in the callers too. I'm assuming this was just double checking? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, just in case somewhere forgets to do the check. |
||
|
||
# We upsert the value, ensuring on conflict that we always increase the | ||
# value (or decrease if stream goes backwards). | ||
sql = """ | ||
INSERT INTO stream_positions (stream_name, instance_name, stream_id) | ||
VALUES (?, ?, ?) | ||
ON CONFLICT (stream_name, instance_name) | ||
DO UPDATE SET | ||
stream_id = %(agg)s(stream_positions.stream_id, EXCLUDED.stream_id) | ||
""" % { | ||
"agg": "GREATEST" if self._positive else "LEAST", | ||
} | ||
|
||
pos = (self.get_current_token_for_writer(self._instance_name),) | ||
txn.execute(sql, (self._stream_name, self._instance_name, pos)) | ||
|
||
|
||
@attr.s(slots=True) | ||
class _AsyncCtxManagerWrapper: | ||
|
@@ -503,4 +597,16 @@ async def __aexit__(self, exc_type, exc, tb): | |
if exc_type is not None: | ||
return False | ||
|
||
# Update the `stream_positions` table with newly updated stream | ||
# ID (unless self._writers is not set in which case we don't | ||
# bother, as nothing will read it). | ||
# | ||
# We only do this on the success path so that the persisted current | ||
# position points to a persisted row with the correct instance name. | ||
if self.id_gen._writers: | ||
await self.id_gen._db.runInteraction( | ||
"MultiWriterIdGenerator._update_table", | ||
self.id_gen._update_stream_positions_table_txn, | ||
) | ||
|
||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By excluding only the known writers here could there be a dataloss situation when removing a writer, if that writer is the minimum position?
If you have writers:
And then remove "C",
_current_positions
will be {A: 15, B: 17} andmin_stream_id
will be 15. I'm not sure if this is OK or not?It seems you also don't want to include writers that you no longer care about or else you can end up in a situation where you have an old writer always saying it is far behind, which I'm guessing this if-statement checks against?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per discussion elsewhere: this is OK because if
C
has been removed from the list it means its been turned off, and so we know its up to date.It's worth noting that these positions aren't really "where
C
has gotten up to in the rooms", but instead is more "whereC
is currently persisting events". I.e. we're not worried that if we removeC
from the deployment thenA
orB
will start persisting at position 12, as eitherC
finished persisting a row at 12 or the request gets retried andA
orB
will persist it with a new position.