diff --git a/changelog.d/6375.bugfix b/changelog.d/6375.bugfix new file mode 100644 index 000000000000..d023b49181cb --- /dev/null +++ b/changelog.d/6375.bugfix @@ -0,0 +1 @@ +Fix `to_device` stream ID getting reset every time Synapse restarts, which had the potential to cause unable to decrypt errors. \ No newline at end of file diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py index 96cd0fb77ade..f04aad074339 100644 --- a/synapse/storage/data_stores/main/deviceinbox.py +++ b/synapse/storage/data_stores/main/deviceinbox.py @@ -358,21 +358,8 @@ def add_messages_txn(txn, now_ms, stream_id): def _add_messages_to_local_device_inbox_txn( self, txn, stream_id, messages_by_user_then_device ): - # Compatible method of performing an upsert - sql = "SELECT stream_id FROM device_max_stream_id" - - txn.execute(sql) - rows = txn.fetchone() - if rows: - db_stream_id = rows[0] - if db_stream_id < stream_id: - # Insert the new stream_id - sql = "UPDATE device_max_stream_id SET stream_id = ?" - else: - # No rows, perform an insert - sql = "INSERT INTO device_max_stream_id (stream_id) VALUES (?)" - - txn.execute(sql, (stream_id,)) + sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?" + txn.execute(sql, (stream_id, stream_id)) local_by_user_then_device = {} for user_id, messages_by_device in messages_by_user_then_device.items(): diff --git a/synapse/storage/data_stores/main/schema/delta/56/device_stream_id.sql b/synapse/storage/data_stores/main/schema/delta/56/device_stream_id.sql new file mode 100644 index 000000000000..ae261a70a28b --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/device_stream_id.sql @@ -0,0 +1,5 @@ +INSERT INTO device_max_stream_id (stream_id) +SELECT 0 +WHERE NOT EXISTS ( + SELECT 1 FROM device_max_stream_id +); \ No newline at end of file